Read Policies

What is a Read Policy?

A Read Policy is used to determine how feature set data is ingested. It's important to pay attention to the chosen Read Policy when defining a new data source.

This document describes the various read policies available in Qwak.

๐Ÿšง

Default Read Policy

Te default read policy is newOnly, if no read policy is set.

New Only

Each batch execution reads only the newly added records, between the last batch execution time to the current job execution time.

The batch execution refers to the timestamp in the timestamp_column_name column, which represents the configured Event time .

@batch.feature_set(
    name="aggregation-by-time-v1",
    entity="transcation",
    data_sources={"snowflake": ReadPolicy.NewOnly},
    timestamp_column_name="DATE_CREATED" # --> must be included in transformation output
)
def user_features():
    return SparkSqlTransformation(sql="""
        SELECT user_id,
               registration_country,
               registration_device,
               date_created
        FROM snowflake_users_table
    """)

Full Read

Each batch reads all the records in the data source up until the job execution time.

Use Cases

  1. Performing a "snapshot" of data source. Each batch job will read all the data until the execution time
    @batch.feature_set(
        name="aggregation-by-time",
        entity="user_id",
        data_sources={"full_read_source": ReadPolicy.FullRead}
        timestamp_column_name="processing_time" # --> must be included in transformation output using qwak timestamp
    )
    
  2. Joining a dimension table of user information with another data source
    @batch.feature_set(
        name="aggregation-by-time",
        entity="transcation",
        data_sources={"snowflake": ReadPolicy.NewOnly,
    									"full_read_source": ReadPolicy.FullRead}
        timestamp_column_name="processing_time" # --> must be included in transformation output using qwak timestamp
    )
    

๐Ÿšง

Past Timestamps

Batch feature sets cannot insert rows with an older timestamp than the current oldest timestamp in the offline feature store.

Each batch must produce a timestamp equal to or larger than the last batch.

In order to use FullRead policy and handle the timestamp,
Qwak feature set transformations supports the following parameters:

  1. qwak_ingestion_start_timestamp
  2. qwak_ingestion_end_timestamp

These parameters may be used to define timestamp columns as such:

  1. Using Spark SQL transformation

    @batch.feature_set(
        name="aggregation-by-time",
        entity="user_id",
        data_sources={"full_read_source": ReadPolicy.FullRead}
        timestamp_column_name="processing_time"
    )
    def transform():
        return SparkSqlTransformation(sql="""
                SELECT user_id,
        				to_timestamp(${qwak_ingestion_start_timestamp}) as processing_time_start,
        				to_timestamp(${qwak_ingestion_end_timestamp}) as processing_time,
        		FROM full_read_source""")
    
    
  2. Using Koalas transformation

    @batch.feature_set(
        name="aggregation-by-time",
        entity="user_id",
        data_sources={"full_read_source": ReadPolicy.FullRead}
        timestamp_column_name="processing_time"
    )
    def transform():
        import databricks.koalas as ks
        from typing import Dict
    
        def koalas_udf(data_sources_dict: Dict[str, ks.DataFrame], qwargs) -> ks.DataFrame:
            job_end_time = qwargs['qwak_ingestion_end_timestamp']
            job_start_time = qwargs['qwak_ingestion_start_timestamp']
            kdf = data_sources_dict['user_id']
            kdf['processing_time'] = job_end_time
            return kdf
    
        return KoalasTransformation(function=koalas_udf)
    

Time Frame

Each batch reads records within a specified time frame, starting from the job execution time until a defined period in the past.

๐Ÿ“˜

Example

We want to track the total number of transactions a user made in the past 7 days.

We can use a TimeFrame read policy to show the aggregated data over sliding window.

This read policy allows us to read only the newly added records in the last 7 days, and transfer the updated information to the feature store.

@batch.feature_set(
    name='total-transactions-7d',
    entity="transaction",
    data_sources={"snowflake": ReadPolicy.TimeFrame(days=7)},
)
@batch.scheduling(cron_expression="@daily")
def transform():
    return SparkSqlTransformation(sql="""
        SELECT transaction,
        <SUM(Amount)> as transactions_total_amount_7d
        FROM snowflake
        GROUP BY transcation 
    """)

๐Ÿ“˜

Read Policy Entities

Entities that belonged to a previous window but are not present at the current current window, will have null feature values.

Using multiple Read policies

Qwak feature sets supports fetching data from multiple sources, with different read policy for each.