Read Policies
What is a Read Policy?
A Read Policy is used to determine how feature set data is ingested. It's important to pay attention to the chosen Read Policy when defining a new data source.
This document describes the various read policies available in Qwak.
Default Read Policy
Te default read policy is
newOnly
, if no read policy is set.
New Only
Each batch execution reads only the newly added records, between the last batch execution time to the current job execution time.
The batch execution refers to the timestamp in the timestamp_column_name
column, which represents the configured Event time .
@batch.feature_set(
name="aggregation-by-time-v1",
entity="transcation",
data_sources={"snowflake": ReadPolicy.NewOnly},
timestamp_column_name="DATE_CREATED" # --> must be included in transformation output
)
def user_features():
return SparkSqlTransformation(sql="""
SELECT user_id,
registration_country,
registration_device,
date_created
FROM snowflake_users_table
""")
Full Read
Each batch reads all the records in the data source up until the job execution time.
Use Cases
- Performing a "snapshot" of data source. Each batch job will read all the data until the execution time
@batch.feature_set( name="aggregation-by-time", entity="user_id", data_sources={"full_read_source": ReadPolicy.FullRead} timestamp_column_name="processing_time" # --> must be included in transformation output using qwak timestamp )
- Joining a dimension table of user information with another data source
@batch.feature_set( name="aggregation-by-time", entity="transcation", data_sources={"snowflake": ReadPolicy.NewOnly, "full_read_source": ReadPolicy.FullRead} timestamp_column_name="processing_time" # --> must be included in transformation output using qwak timestamp )
Past Timestamps
Batch feature sets cannot insert rows with an older timestamp than the current oldest timestamp in the offline feature store.
Each batch must produce a timestamp equal to or larger than the last batch.
In order to use FullRead
policy and handle the timestamp,
Qwak feature set transformations supports the following parameters:
qwak_ingestion_start_timestamp
qwak_ingestion_end_timestamp
These parameters may be used to define timestamp columns as such:
-
Using Spark SQL transformation
@batch.feature_set( name="aggregation-by-time", entity="user_id", data_sources={"full_read_source": ReadPolicy.FullRead} timestamp_column_name="processing_time" ) def transform(): return SparkSqlTransformation(sql=""" SELECT user_id, to_timestamp(${qwak_ingestion_start_timestamp}) as processing_time_start, to_timestamp(${qwak_ingestion_end_timestamp}) as processing_time, FROM full_read_source""")
-
Using Koalas transformation
@batch.feature_set( name="aggregation-by-time", entity="user_id", data_sources={"full_read_source": ReadPolicy.FullRead} timestamp_column_name="processing_time" ) def transform(): import databricks.koalas as ks from typing import Dict def koalas_udf(data_sources_dict: Dict[str, ks.DataFrame], qwargs) -> ks.DataFrame: job_end_time = qwargs['qwak_ingestion_end_timestamp'] job_start_time = qwargs['qwak_ingestion_start_timestamp'] kdf = data_sources_dict['user_id'] kdf['processing_time'] = job_end_time return kdf return KoalasTransformation(function=koalas_udf)
Time Frame
Each batch reads records within a specified time frame, starting from the job execution time until a defined period in the past.
Example
We want to track the total number of transactions a user made in the past 7 days.
We can use a
TimeFrame
read policy to show the aggregated data over sliding window.This read policy allows us to read only the newly added records in the last 7 days, and transfer the updated information to the feature store.
@batch.feature_set(
name='total-transactions-7d',
entity="transaction",
data_sources={"snowflake": ReadPolicy.TimeFrame(days=7)},
)
@batch.scheduling(cron_expression="@daily")
def transform():
return SparkSqlTransformation(sql="""
SELECT transaction,
<SUM(Amount)> as transactions_total_amount_7d
FROM snowflake
GROUP BY transcation
""")
Read Policy Entities
Entities that belonged to a previous window but are not present at the current current window, will have null feature values.
Using multiple Read policies
Qwak feature sets supports fetching data from multiple sources, with different read policy for each.
Updated 8 months ago