Aggregate Time Series over Time Windows using a UDF
This text demonstrates how to aggregate numeric values over time windows using user-defined functions in the Qwak Feature Store.
Before we start, we have to define the data source and the entity. In this tutorial, we will use a CsvSource
with data stored in S3.
Let's create an empty Python file and copy the following code:
from qwak.feature_store.sources.data_sources import CsvSource
from qwak.feature_store.entities import Entity, ValueType
csv_source = CsvSource(
name='test_time_series_data',
description='Some time series data',
date_created_column='created_date',
path='s3://path'
)
enity = Entity(
name = 'test_time_series_data_entity',
keys = ['id'],
description = 'Time series data',
value_type = ValueType.INTEGER
)
After that, we define the custom function to generate the aggregation:
def calculate_sum(kdf_dict):
# we are going to merge columns from two data sources, so we need to enable this operation in Koalas
from databricks.koalas.config import set_option, reset_option
set_option("compute.ops_on_diff_frames", True)
kdf = kdf_dict['test_time_series_data'] # getting the data source
dates = kdf['created_date'] # we want to preserve the original dates, not sum them
agg_kdf = kdf.sort_values(by=['created_date']).rolling(window=7).sum()
agg_kdf['created_date'] = dates
reset_option("compute.ops_on_diff_frames") # we don't need this anymore
agg_kdf = agg_kdf.dropna()
agg_kdf['id'] = agg_kdf.index # we need unique ids, but in our case the values don't have a business meaning, so we can use the index
return agg_kdf
Finally, we can configure the feature set:
from qwak.feature_store.features.feature_sets import BatchFeatureSet, Metadata, Backfill
from qwak.feature_store.features.functions import UdfFunction
from qwak.feature_store.features.read_policies import ReadPolicy
batch_feature_set = BatchFeatureSet(
name='test_time_series_data_with_aggregation',
metadata=Metadata(
display_name='Time series aggregation',
description='Desc',
owner='[email protected]'
),
entity='test_time_series_data_entity',
data_sources={'test_time_series_data': ReadPolicy.FullRead},
backfill=Backfill(
start_date=datetime(2022, 4, 1)
),
scheduling_policy='*/10 * * * *',
function=UdfFunction(calculate_sum)
)
After writing all of the feature definition code, we can register the new feature by running the qwak features register -p .
command in the directory containing the Python file we have just created.
Updated 10 months ago