Batch Feature Set

Batch feature set overview

Batch Feature Sets produce ML features by reading data from batch sources (e.g., Snowflake, BigQuery, S3 etc.) and applying a user-provided transformation to it.

The result is then stored in Qwak's Feature Storage layer, where it is made available for both online inference and for generating training sets.

Some common use cases for batch feature sets:

  1. Compute transaction aggregates from a Snowflake transaction table. (e.g., user expenditure over the last week)
  2. Ingest user properties from a BigQuery users table.


Creating a batch feature set

To create a batch feature set, we need to define a feature transformation function,
and use the@batch.feature_set decorator with the following parameters:

  • name - Unless a name is defined, the decorated function name is used.
    This field may contain only alphanumeric and hyphen characters, up to 40 characters long.
  • entity - The name of an existing entity.
  • data_sources - A dictionary of the relevant data sources names.

Ingestion jobs are triggered every 4 hours unless otherwise explicitly defined.

An example batch feature set

from qwak.feature_store.v1 import batch, SparkSqlTransformation

@batch.feature_set(
    name="user-features",
    entity="user_id",
    data_sources=["snowflake_users_table"],
)
def user_features():
    return SparkSqlTransformation(sql="""
        SELECT user_id,
               registration_country,
               registration_device,
               date_created
        FROM snowflake_users_table""")

This example:

  • Creates a job that runs every 4 hours.
  • Fetches data from snowflake_users_table source.
  • Creates a transformed feature vector with the fields: user_id, registration_country, registration_device
  • Ingests the feature vector into the Qwak Feature Store
Batch jobs run periodically and ingest data from the datasource intro the feature store

Batch jobs run periodically and ingest data from the datasource intro the feature store

Adding metadata

An optional decorator for defining feature set metadata information of:
owner - User name of feature set owner
description - Describe what does this feature set do
display_name - Alternative feature set name for UI display

from qwak.feature_store.v1 import batch, SparkSqlTransformation

@batch.feature_set(
    name="user-features",
    entity="user_id",
    data_sources=["snowflake_users_table"],
)
@batch.metadata(
    owner="John Doe",
    display_name="User Aggregation Data",
    description="User origin country and devices"
)
def user_features():
    return SparkSqlTransformation(sql="""
        SELECT user_id,
               registration_country,
               registration_device,
               date_created
        FROM snowflake_users_table""")

Configuring data sources

Choose Batch Data Sources to be used for the feature set data ingestion.

Each data source name should be mapped to its desired Read Policy.

The default read policy is New Only, which reads only the new records relative to the last ingestion job.

@batch.feature_set(
    name="user-features"Χͺ
    entity="user",
    data_sources = {
        "snowflake_users_table": ReadPolicy.NewOnly
    })
)
def user_features():
    return SparkSqlTransformation(sql="""
        SELECT user_id,
               registration_country,
               registration_device,
               date_created
        FROM snowflake_users_table
    """)

Explicit timestamp definition

A timestamp column name represents the timestamp in which the feature vector event occurred.

@batch.feature_set(
    name="user-features"
    entity="user", 
    data_sources=["snowflake_users_table"],
    timestamp_column_name="update_timestamp"
)
def user_features():
    return SparkSqlTransformation(sql="""
        SELECT user,
               registration_country,
               registration_device,
               date_created 
        FROM snowflake_users_table
    """)

🚧

When using multiple data sources you must explicitly define the timestamp column

Implicit timestamp definition

  1. Setting the read policy as Time Frame, adds an additional timestamp column named QWAK_WINDOW_END_TS, which is used as the feature set timestamp column.
  2. When a single data source is set, the defined date_created_column in the data source is used.

Controlling the scheduling policy

Batch features are materialized as scheduled ETLs. Once a feature is deployed, new feature values will be calculated at a cadence defined by the scheduling policy. The scheduling policy follows the crontab format.

πŸ“˜

Default scheduling policy

The default scheduling is every 4 hours if no policy is set.

@batch.feature_set(
    name="user-features"
    entity="user", 
    data_sources=["snowflake_users_table"],
    timestamp_column_name="date_created" 
)
@batch.scheduling(cron_expression="0 8 * * *")
def user_features():
    return SparkSqlTransformation(sql="""
        SELECT user_id,
               registration_country,
               registration_device,
               date_created
        FROM snowflake_users_table
    """)

The definition above means that the feature ingestion job will be triggered on a daily basis.

πŸ’‘

Creating manually triggered only feature sets.

Passing None to the scheduling decorator would deploy a feature set that can only be triggered manually.

🚧

None will disable the automatic feature ingestion!

@batch.feature_set(
    name="user-features"
    entity="user", 
    data_sources=["snowflake_users_table"],
    timestamp_column_name="date_created" 
)
# Note: This feature set is manually triggered only
# Passing None to a feature set disables automatic ingestion
@batch.scheduling(cron_expression=None)
def user_features():
    return SparkSqlTransformation(sql="""
        SELECT user_id,
               registration_country,
               registration_device,
               date_created
        FROM snowflake_users_table
    """)

Backfilling feature values

If configured, the backfill policy will determine how and from which date and time to populate the new feature set with historical feature values.

@batch.feature_set(
    name="user-features"
    entity="user", 
    data_sources=["snowflake_users_table"],
    timestamp_column_name="date_created" 
)
@batch.backfill(start_date=datetime.datetime(year=2022, month=12, day=30))
def user_features():
    return SparkSqlTransformation(sql="""
        SELECT user_id,
               registration_country,
               registration_device,
               date_created
        FROM snowflake_users_table
    """)

Using SQL transformations

The following is an implementation of creating a transformation using a SQL

πŸ“˜

Qwak runs Spark SQL in the background. Please comply with Spark Standards.

from qwak.feature_store.features import ReadPolicy
from qwak.feature_store.v1 import batch, SparkSqlTransformation

@batch.feature_set(
    name="user-transaction-aggregations",
    entity="user_id",
    data_sources={"snowflake_datasource": ReadPolicy.TimeFrame(days=30)},
)
def user_features():
    return SparkSqlTransformation(sql="""
            SELECT
            user_id,
            AVG(credit_amount) as avg_credit_amount,
            STD(credit_amount) as std_credit_amount,
            MAX(credit_amount) as max_credit_amount,
            MIN(date_created) as first_transaction,
            AVG(duration) as avg_loan_duration,
            AVG(job) as seniority_level
        FROM snowflake_datasource
        Group By user_id""")

🚧

Creating transformations

  • The returned feature vector must include the entity name
  • For NewOnly and FullRead read policies, the timestamp column must be included in the returned feature vector.
  • Use the data source as the table name in the FROM clause.
  • Make sure the column names resulting from the SQL has no special characters. The allowed characters are: a-z,A-Z,0-9,_.

Using Koalas transformations

Koalas is a pandas implementation using Spark. Ensure your code complies with Databricks Koalas library.

The UDF receives as input a dictionary in the form of {'\<batch_sourcename>': kdf ...}.

The returned kdf (Koalas DataFrame) must contain a column representing the configured entity. The kdf must not include complex columns, such as multi index, and the name must not include whitespaces or special characters.

Make sure that column names returned from the UDF has no special characters.

The allowed characters are: a-z,A-Z,0-9,_.

❗️

Use Python 3.8 when registering a feature set with a Koalas transformation.

@batch.feature_set(
    name="user-features"
    entity="user", 
    data_sources={"snowflake_transactions_table": ReadPolicy.TimeFrame(days=30)},
    timestamp_column_name="date_created" 
)
@batch.scheduling(cron_expression="0 8 * * *")
def transform():

  def amount_stats(kdf_dict):
    kdf = kdf_dict['snowflake_transactions_table']
    agg_kdf = kdf.groupby('user').agg({'amount': ['avg', 'sum']})
    agg_kdf.columns = ['_'.join(column) for column in kdf.columns.values]
    return agg_kdf

  return KoalasTransformation(function=amount_stats)

❗️

Function scope and dependencies

Koalas function scope and variables must be defined under the transform function, as shown in the code snippet above.
At runtime, only pandas and koalas and python native library, are available.

πŸ“˜

Logging

We support the default Python logger, which you can import from the standard python logging library.

Supported Spark column types

IntegerType, LongType, StringType, DoubleType, DecimalType, FloatType, BooleanType, TimestampType, DateType, ArrayType.

ArrayType column may include any of the above column types, except for another ArrayType column.

❗️

Batch cannot insert rows with an older timestamp than the current oldest timestamp in the offline feature store.

Each batch must produce a timestamp equal to or larger than the last batch.


Defining the execution resources

Qwak uses a cluster of resources, often referred as the "cluster template", to provide the batch execution job its required resources, such as CPU, memory, and temporary storage. These are used to perform the user defined transformation and feature ingestion into both stores.

Cluster template size start from MEDIUM - which is the default is none specified.

from qwak.feature_store.features.execution_spec import ClusterTemplate
from qwak.feature_store.v1 import batch, SparkSqlTransformation

@batch.feature_set(
    name="user-features",
    entity="user_id",
    data_sources=["snowflake_users_table"])
@batch.execution_specification(cluster_template=ClusterTemplate.LARGE)
def user_features():
    return SparkSqlTransformation(sql="""
        SELECT user_id,
               registration_country,
               registration_device,
               date_created
        FROM snowflake_users_table""")

The cluster template to QPU mapping is such:

ClusterQPU Per Hour
MEDIUM14
LARGE28
X-LARGE52

Getting feature samples

To make sure the generated feature set matches your expectation, even before the feature set is registered in Qwak, use the get_sample functionality.

This function computes a sample feature vectors using the defined transformation applied on a sample data from your defined data source and returns a pandas DataFrame.

    @batch.feature_set(
        name="user-transaction-aggregations",
        entity="user_id",
        data_sources={"snowflake_datasource": ReadPolicy.NewOnly()},
        timestamp_column_name="DATE_CREATED"
    )
    def user_features():
        return SparkSqlTransformation(sql="""
                SELECT
                user_id,
                date_created,
                AVG(credit_amount) as avg_credit_amount,
                MAX(credit_amount) as max_credit_amount,
                MIN(date_created) as first_transaction
            FROM snowflake_datasource
            Group By user_id, date_created""")

user_features_df = user_features.get_sample(number_of_rows=10)

See an example for the resulting user_features_df Dataframe:

+----+--------------------------------------+----------------+---------------------+---------------------+---------------------+
|    | user_id                              |   date_created |   avg_credit_amount |   max_credit_amount |   first_transaction |
|----+--------------------------------------+----------------+---------------------+---------------------+---------------------|
|  0 | baf1aed9-b16a-46f1-803b-e2b08c8b47de |  1609459200000 |                1169 |                1169 |       1609459200000 |
|  1 | 1b044db3-3bd1-4b71-a4e9-336210d6503f |  1609459200000 |                2096 |                2096 |       1609459200000 |
|  2 | ac8ec869-1a05-4df9-9805-7866ca42b31c |  1609459200000 |                7882 |                7882 |       1609459200000 |
|  3 | aa974eeb-ed0e-450b-90d0-4fe4592081c1 |  1609459200000 |                4870 |                4870 |       1609459200000 |
|  4 | 7b3d019c-82a7-42d9-beb8-2c57a246ff16 |  1609459200000 |                9055 |                9055 |       1609459200000 |
|  5 | 6bc1fd70-897e-49f4-ae25-960d490cb74e |  1609459200000 |                2835 |                2835 |       1609459200000 |
|  6 | 193158eb-5552-4ce5-92a4-2a966895bec5 |  1609459200000 |                6948 |                6948 |       1609459200000 |
|  7 | 759b5b46-dbe9-40ef-a315-107ddddc64b5 |  1609459200000 |                3059 |                3059 |       1609459200000 |
|  8 | e703c351-41a8-43ea-9615-8605da7ee718 |  1609459200000 |                5234 |                5234 |       1609459200000 |
|  9 | 66025b0f-6a7f-4f86-9666-6622be82d870 |  1609459200000 |                1295 |                1295 |       1609459200000 |
+----+--------------------------------------+----------------+---------------------+---------------------+---------------------+

Registering a feature set

The ETL pipeline will begin after registering a batch feature set.

There are two options for registering new features:

  1. Pointing directly to a Python file or directory containing the feature set definitions, for example
qwak features register -p ./user_project/features/feature_set.py
  1. Letting qwak cli recursively search through all python files in the current directory, and all directories below. We will search through all .py files and look for feature set definitinos.
qwak features register

πŸ“˜

Function naming conventions

Same feature set transformation function name, cannot be defined more than once per .py file

βœ… Recursively looking for python files in input dir (0:00:00.61)
βœ… Finding Entities to register (0:00:00.01)
πŸ‘€ Found 3 Entities
----------------------------------------
βœ… Finding Data Sources to register (0:00:00.01)
πŸ‘€ Found 2 Data Sources
----------------------------------------
βœ… Finding Feature Sets to register (0:00:00.00)
πŸ‘€ Found 9 Feature Set(s)

The pipeline execution status will be visible in the UI under the list of registered feature sets.

To view the status in the UI, go to Feature Store -> Feature Sets -> Batch Feature Set Name.

Updating a feature set

Feature set configuration may be updated, except for the following limitations

🚧

Recreating feature sets

Changing any of the parameters above requires deleting and recreating a feature set:

  • Backfill Start Date
  • Read Policy

Deleting a feature set

Feature sets may be deleted using a Qwak CLI command.

To delete a feature set, simply refer to it by it's name.
There's no need to be located in a certain folder in the project structure.

For example, in the above examples we created a feature set named user-features. To delete this feature set via the Qwak CLI, all I need is the following command.

qwak features delete user-features

Manually executing an ingestion job

In case you would like to execute an ingestion job immediately, the following options are supported:

  • Python SDK - using the QwakClient:
from qwak import QwakClient

client = QwakClient()
client.execute_batch_feature_set("user-features")
  • CLI - using the following command:
qwak features run user-features
  • Qwak web UI - via the 'Jobs' tab in the app, click on 'Execute Job'.

Pausing/ resuming batch ingestion jobs

When pausing a Batch Feature Set, future ingestion jobs will not be scheduled (running jobs are not affected), yet you can still manually trigger ingestion jobs.
Upon resuming a batch feature set, it is re-scheduled, and ingestion jobs will continue ingesting data from where they have last left off.

πŸ“˜

When resuming a feature set, the ingestion jobs will continue as scheduled - meaning feature sets jobs will start "catching up" on jobs that were skipped during the time it was paused.

For example, if an hourly feature set we paused for 3 days - after resuming it, those hourly jobs will be executed immediately one after the other until the data is all caught up.

To pause a Batch Feature Set, use the Qwak CLI:

qwak features pause user-features

Accordingly, to resume:

qwak features resume user-features