Aggregate Time Series over Time Windows using a UDF

This text demonstrates how to aggregate numeric values over time windows using user-defined functions in the Qwak Feature Store.

Before we start, we have to define the data source and the entity. In this tutorial, we will use a CsvSource with data stored in S3.

Let's create an empty Python file and copy the following code:

from qwak.feature_store.sources.data_sources import CsvSource
from qwak.feature_store.entities import Entity, ValueType

csv_source = CsvSource(
    name='test_time_series_data',
    description='Some time series data',
    date_created_column='created_date',
    path='s3://path'
)

enity = Entity(
    name = 'test_time_series_data_entity',
    keys = ['id'],
    description = 'Time series data',
    value_type = ValueType.INTEGER
)

After that, we define the custom function to generate the aggregation:

def calculate_sum(kdf_dict):
    # we are going to merge columns from two data sources, so we need to enable this operation in Koalas
    from databricks.koalas.config import set_option, reset_option
    set_option("compute.ops_on_diff_frames", True)

    kdf = kdf_dict['test_time_series_data'] # getting the data source
    dates = kdf['created_date'] # we want to preserve the original dates, not sum them
    agg_kdf = kdf.sort_values(by=['created_date']).rolling(window=7).sum()
    agg_kdf['created_date'] = dates

    reset_option("compute.ops_on_diff_frames") # we don't need this anymore

    agg_kdf = agg_kdf.dropna()
    agg_kdf['id'] = agg_kdf.index # we need unique ids, but in our case the values don't have a business meaning, so we can use the index
    return agg_kdf

Finally, we can configure the feature set:

from qwak.feature_store.features.feature_sets import BatchFeatureSet, Metadata, Backfill
from qwak.feature_store.features.functions import UdfFunction
from qwak.feature_store.features.read_policies import ReadPolicy

batch_feature_set = BatchFeatureSet(
    name='test_time_series_data_with_aggregation',
    metadata=Metadata(
        display_name='Time series aggregation',
        description='Desc',
        owner='[email protected]'
    ),
    entity='test_time_series_data_entity',
    data_sources={'test_time_series_data': ReadPolicy.FullRead},
    backfill=Backfill(
        start_date=datetime(2022, 4, 1)
    ),
    scheduling_policy='*/10 * * * *',
    function=UdfFunction(calculate_sum)
)

After writing all of the feature definition code, we can register the new feature by running the qwak features register -p . command in the directory containing the Python file we have just created.