Streaming Feature Sets

A Streaming Feature Set is identical to a Batch Feature Set in terms of its use (retrieving online/offline features), but instead of reading data from a Batch Source, it reads from an infinite Stream Source - For example, Apache Kafka.

The 2 basic building blocks that define a Streaming Feature Set are its Streaming Source (e.g., Kafka) and Transformation.

See Streaming Sources section for more details regarding the available Streaming Sources.

Transformations

Row-Level transformations that are applied to the data (in a streaming fashion) - these transformation produce the actual features.

Available Flavors:

SQL Transforms

Row-Level arbitrary SQL, with support for PySpark Pandas UDFs inside SQL Transforms, leveraging Vectorized computation using PyArrow.

from qwak.feature_store.features.streaming_feature_sets import *
from qwak.feature_store.features.transform import *

sample_fs = StreamingFeatureSetV1(
    name='sample_fs',
    metadata=Metadata(
        owner="foo", description="Streaming Feature Set", display_name="sample_fs"
    ),
    data_source='sample-source',
    entity='id',
    timestamp_col_name='timestamp',
    transform=SqlTransformation(
        sql="select timestamp, id, first_name, last_name from sample_source"
    ),
)

Or, when using a Pandas UDF:

from qwak.feature_store.features.transform import *
from qwak.feature_store.functions.qwak_pandas import *
from qwak.feature_store.functions.schema import *
from qwak.feature_store.features.streaming_feature_sets import *
import pandas as pd


@qwak_pandas_udf(output_schema=Schema([
    Column(type=Type.long)]))
def plus_one(column_a: pd.Series) -> pd.Series:
    return column_a + 1


@qwak_pandas_udf(output_schema=Schema([
    Column(type=Type.long)]))
def mul_by_two(column_a: pd.Series) -> pd.Series:
    return column_a * 2


sql_fs = StreamingFeatureSetV1(
    name='some_streaming_fs',
    description='some desc',
    data_source='ds1',
    entity='customer_id',
    timestamp_col_name='event_timestamp',
    transform=SqlTransformation(
        sql="select timestamp, mul_by_two(value) as col1, plus_one(value) as col2 from ds1",
        functions=[plus_one, mul_by_two]
    )
)

Full-DataFrame Pandas UDF Transforms

Just like the Pandas UDFs supported in SQL Transforms, but defined on the entire DataFrame (no need to write any SQL):

from qwak.feature_store.features.transform import *
from qwak.feature_store.functions.qwak_pandas import *
from qwak.feature_store.functions.schema import *
from qwak.feature_store.features.streaming_feature_sets import *
import pandas as pd

@qwak_pandas_udf(output_schema=Schema([
    Column(name='rate_mul', type=Type.long),
    Column(name='timestamp', type=Type.timestamp),
]))
def func(df: pd.DataFrame) -> pd.DataFrame:
    df = pd.DataFrame({'rate_mul': df['value'] * 1000, 'timestamp': df['timestamp']})
    return df

udf_fs = StreamingFeatureSetV1(
    name='some_streaming_fs',
    description='some desc',
    data_source='ds1',
    entity='customer_id',
    timestamp_col_name='event_timestamp',
    transform=UdfTransformation(
        function=func
    )
)

Resource Configuration

Streaming Feature Sets support per-featueset resource definition and even different resource definitions for the online/offline.
A user can choose a Cluster Template or a manual resource configuration - these can be mixes for the offline/online.
For example:

from qwak.feature_store.features.streaming_feature_sets import *

execution_spec = StreamingExecutionSpec(online_cluster_template=ClusterTemplate.LARGE, offline_cluster_template=ClusterTemplate.XLARGE)

sample_fs = StreamingFeatureSetV1(
    name='sample_fs',
    metadata=Metadata(
        owner="foo", description="Streaming Feature Set", display_name="sample_fs"
    ),
    data_source='sample_source',
    entity='id',
    timestamp_col_name='timestamp',
    transform=SqlTransformation(
        sql="select timestamp, id, first_name, last_name from sample_source"
    ),
    execution_spec=execution_spec,
)

StreamingFeatureSetV1 Parameter Reference

nametypedescriptiondefault value
namestrname of the feature setthis parameter is mandatory
metadataMetadataMetadata object for the feature setthis parameter is mandatory
entitystrname of the entity of the featureset - this entity must pre-exist in Qwak.
All columns defined in this entity must be present in the output of the Transformation.
this parameter is mandatory
transformTransformationRow-Level transformationthis parameter is mandatory
timestamp_col_namestrname of the timestamp column used to order all events.
this column must belong to the output of the transformation and must be per-entity strictly monotonically increasing - meaning there can not be 2 rows with the same entity and the same timestamp. if such a situation occurs, there is no guarantee regarding which of the 2 rows will be selected.
this parameter is mandatory
execution_specStreamingExecutionSpecResource definition for the online/offline StreamingExecutionSpec(online_cluster_template=ClusterTemplate.MEDIUM, offline_cluster_template=ClusterTemplate.MEDIUM)
data_sourceDataSourceInput DataSourceThis parameter is mandatory
online_trigger_intervalint5
offline_scheduling_policystr (cron format)"/30 * * *"

Event-time Aggregations

In addition to row-level operations, event-time aggregations are also supported, with EXACTLY ONCE semantics.
Internally, we employ our highly-optimized proprietary implementation, partially based on open source Apache Spark ™️.

This implementation is designed to optimize for data freshness, low serving latency and high throughput, while handling multiple long and short, overlapping time windows and out-of-order data (late arrivals) without the intense resource consumption that is often incurred in these cases.

Enabling Aggregations is done by adding them on top of the row-level transform, for example:

sql = "SELECT timestamp, user_id, transaction_amount, is_remote, offset, topic, partition FROM transaction_stream"
transform = SqlTransformation(sql)\
    .aggregate(QwakAggregation.avg("transaction_amount"))\
    .aggregate(QwakAggregation.boolean_or("is_remote"))\
    .aggregate(QwakAggregation.sum("transaction_amount"))\
    .by_windows("1 minute", "1 hour", "3 hour", "1 day", "7 day")

In the above example, we declare that we are interested in the average transaction amount, sum of transaction amounts and whether there was only remote transaction, all computed on 5 time windows, from 1 minute to 7 days.

Let's break this example into pieces:

  1. row-level transform: the regular row-level transform defined for row-level streaming - this can either be a SQL transform (with or without pandas UDFs) or a full-dataframe pandas udf. all aggregations are defined on columns that belong to the output of this transform. all row-level modifications prior to aggregation (string manipulation, currency conversion, boolean conditions etc.)

atm, we also need to select 3 kafka metadata columns - these are internally used by Qwak to guarantee EXACTLY ONCE.

  1. Declarative aggregates: we add each aggregation in a chaining fashion, in the above example we had avg, sum, and boolean_or.
  2. time windows: define the time windows on which we aggregate - in that case we had 3 aggregates and 5 time windows - meaning the resulting Featureset will have 15 features.

We currently support the following aggregates:

  1. SUM - a sum of column, for example, QwakAggregation.sum("transaction_amount")
  2. COUNT - count (not distinct), a column is specified for API uniformity. for example, QwakAggregation.count("transaction_amount")
  3. AVERAGE - mean value, for example QwakAggregation.avg("transaction_amount")
  4. MIN - minimum value, for example QwakAggregation.min("transaction_amount")
  5. MAX - maximum value, for example QwakAggregation.max("transaction_amount")
  6. BOOLEAN OR - boolean or, defined over a boolean column, for example QwakAggregation.boolean_or("is_remote")
  7. BOOLEAN AND - boolean and, defined over a boolean column, for example QwakAggregation.boolean_and("is_remote")
  8. Sample Variance - QwakAggregation.sample_variance("transaction_amount")
  9. Sample STDEV - QwakAggregation.sample_stdev("transaction_amount")
  10. Population Variance - QwakAggregation.population_variance("transaction_amount")
  11. Population STDEV - QwakAggregation.population_stdev("transaction_amount")
  12. Last N - QwakAggreation.last_n("transaction_amount", n=10) - collects the last n values of a column within the window, using the timestamp column as the ordering
  13. Last Distinct N QwakAggregation.last_distinct_n("transaction_amount", n=10) - collects the last distinct n values of a column within the window, using the timestamp column as the ordering
  14. Percentile - QwakAggregation.percentile("transaction_amount", p=45) - calculates the p-percentile (for a value of p between 1 and 99, inclusive) of a column within the window. for example, to calculate a median set p=50

In addition, it's also possible to add an Alias - a prefix for the result feature name.
by default, an aggregate results in a feature named <aggregate_name>_<column_name>_<window_size>, for each window defined.
In some cases, it may be desired to a have different prefix rather than <aggregate_name>_<column_name>- in these cases, we simply specify an alias:

transform = SqlTransformation(sql)\
    .aggregate(QwakAggregation.avg("transaction_amount"))\
    .aggregate(QwakAggregation.boolean_or("is_remote").alias("had_remote_transaction"))\
    .by_windows("1 minute, 1 hour")

in the above sample, we've aliased the boolean_or aggregate, so it's now called had_remote_transactions_<window>, where <window> is the time window.
the example below will result in 4 features: avg_transaction_amount_1m, avg_transaction_amount_1h, had_remote_transactions_1m, had_remote_transactions_1h

Event-time Aggregations Backfill

For streaming aggregation featuresets,
adding backfill spec will populate historical features values from batch Datasources before deploying the actual streaming featureset

The StreamingBackfill parameters are:

  • start_datetime: Datetime to start fetching values from
  • end_datetime: Datetime to end fetching values from

🚧

end_datetime must be divisible by slice size

  • transform: An SQL transformation that select the relevant features from the batch sources,
    and it's output schema must include the raw streaming source schema
  • data_source_specs: List of existing batch data source names to fetch from
  • execution_spec: [optional] resource template for backfill step
streaming_backfill=StreamingBackfill(
        start_datetime=datetime(2020, 1, 1),
        end_datetime=datetime(2022, 9, 1),
        transform=SqlTransformation(
            "SELECT timestamp, id as key, amount as value FROM sample-batch-source"
        ),
        data_source_specs=["sample-batch-source"],
        execution_spec=ClusterTemplate.SMALL,
    ),

In the example above we are extracting from sample-batch-source (which is an existing batch datasource), the id and amount features and
rename them to match the desired feature name of the raw stream source column names
The data will be between 1/1/2020 and 1/9/2022

If it is needed to specify specific datetime filter for each batch source (i.e selecting from different sub start and end time for each source), we need to pass BackfillBatchDataSourceSpec:

📘

Filtering per data source is optional,
keep in mind that the general start and end time filter set for the backfill will be lower and upper limits for any sub specific backfill source filter

data_source_specs=[BackfillBatchDataSourceSpec(data_source_name="sample-batch-source",
                                               start_datetime=datetime(2021, 1, 1),
                                               end_datetime=datetime(2021, 3, 1))],