Transformations
This section describes the various transformations supported by JFrog ML.
SQL
JFrog ML runs Spark SQL in the background. Please comply with Spark Standards.
The following is an implementation of creating a transformation using a SQL:
import qwak.feature_store.feature_sets.read_policies as ReadPolicy
import qwak.feature_store.feature_sets.batch as batch
from qwak.feature_store.feature_sets.transformations import SparkSqlTransformation
@batch.feature_set(
name="user-transaction-aggregations",
key="user_id",
data_sources={"snowflake_datasource": ReadPolicy.TimeFrame(days=30)},
)
def user_features():
return SparkSqlTransformation(sql="""
SELECT
user_id,
AVG(credit_amount) as avg_credit_amount,
STD(credit_amount) as std_credit_amount,
MAX(credit_amount) as max_credit_amount,
MIN(date_created) as first_transaction,
AVG(duration) as avg_loan_duration,
AVG(job) as seniority_level
FROM snowflake_datasource
Group By user_id""")
Creating Transformations
When creating transformations, keep the following guidelines in mind:
- Key Inclusion:
- The resulting feature vector must incorporate the feature set key, used in the definition.
- Timestamp Column Requirement:
- For read policies such as
NewOnly
andFullRead
, it is imperative to include the timestamp column in the returned feature vector.
- For read policies such as
- Use the data source as the table name in the FROM clause.
- Make sure the column names resulting from the SQL has no special characters. The allowed characters are: a-z,A-Z,0-9,_.
Logging
We support the default Python logger, which you can import from the standard python logging library.
PySpark
To use this feature, ensure that you have installed the qwak-sdk
with the feature-store extra.
pip install -U "qwak-sdk[feature-store]"
PySpark transformation is defined by creating a UDF which is responsible for the transformation logic.
UDF definition:
- Arguments:
df_dict: spark.DataFrame
- Mandatory
A dictionary in the form of{'\<batch_sourcename>': df ...}
.qwargs: Dict[str, Any]
- Optional
If added, runtime parameters will be injected viaqwargs
(e.g.qwak_ingestion_start_timestamp
,qwak_ingestion_end_timestamp
)
- Return value: spark.DataFrame
The returned df (PySpark DataFrame) must contain a column representing the configured key. The df column names must not include whitespaces or special characters.
Python and Dependency Restrictions
To ensure compatibility and stability, it is mandatory to use Python 3.8 when registering a feature set with a Koalas transformation. Additionally, ensure that
cloudpickle
version is locked to2.2.1
.
from typing import Dict, Any
import pyspark.sql as spark
import pyspark.sql.functions as F
from qwak.feature_store.feature_sets import batch
from qwak.feature_store.feature_sets.read_policies import ReadPolicy
from qwak.feature_store.feature_sets.transformations import PySparkTransformation
@batch.feature_set(
name="user-features",
key="user",
data_sources={"snowflake_transactions_table": ReadPolicy.TimeFrame(days=30)},
timestamp_column_name="date_created"
)
@batch.scheduling(cron_expression="0 8 * * *")
def transform():
def amount_stats(df_dict: Dict[str, spark.DataFrame], qwargs: Dict[str, Any]) -> spark.DataFrame:
df = df_dict['snowflake_transactions_table']
agg_df = df.groupby('user').agg(F.max('amount').alias("max_duration"))
return agg_df
return PySparkTransformation(function=amount_stats)
Function scope and dependencies
PySpark function scope and variables must be defined under the transform function, as shown in the code snippet above.
At runtime, only PySpark and python native library, are available.
Logging
We support the default Python logger, which you can import from the standard python logging library.
Pandas On Spark
Pandas On Spark is a pandas implementation using Spark. Please ensure your code is Pandas On Spark library compliant.
The User Defined Function (UDF) receives a dictionary
in the form of {'\<batch_source_name>': pyspark.pandas.DataFrame ...}
as input.
The returned pyspark.pandas.DataFrame (Pandas On Spark DataFrame) must contain a column representing the configured key and timestamp column. The psdf must not include complex columns, such as multi-index, and the name must not include whitespaces or special characters.
Make sure that column names returned from the UDF do not contain special characters.
The allowed characters are: a-z,A-Z,0-9,_..
Restrictions
Deployment - supported for Hybrid deployments ONLY.
Dependencies - to ensure compatibility and stability, it is mandatory to use Python 3.8 when registering a Feature Set with a Pandas On Spark transformation.
from typing import Dict, Any
import qwak.feature_store.feature_sets.batch as batch
from qwak.feature_store.feature_sets.read_policies import ReadPolicy
from qwak.feature_store.feature_sets.transformations import PandasOnSparkTransformation
from pyspark.pandas import DataFrame
@batch.feature_set(
name="user-features",
key="user",
data_sources={"snowflake_transactions_table": ReadPolicy.TimeFrame(days=30)},
timestamp_column_name="date_created"
)
@batch.scheduling(cron_expression="0 8 * * *")
def transform():
def amount_stats(df_dict: Dict[str, DataFrame], qwargs: Dict[str, Any]) -> DataFrame:
ps_df = df_dict['snowflake_transactions_table']
agg_psdf = ps_df.groupby('user').agg({'amount': ['avg', 'sum']})
return agg_psdf
return PandasOnSparkTransformation(function=amount_stats)
Function scope and dependencies
Pandas On Spark function scope and variables must be defined under the transform function, as shown in the code snippet above.
Logging
We support the default Python logger, which you can import from the standard python logging library.
Koalas (deprecated)
Koalas is a pandas implementation using Spark. Ensure your code complies with Databricks Koalas library.
The UDF receives as input a dictionary in the form of {'\<batch_sourcename>': kdf ...}
.
The returned kdf (Koalas DataFrame) must contain a column representing the configured key. The kdf must not include complex columns, such as multi index, and the name must not include whitespaces or special characters.
Make sure that column names returned from the UDF has no special characters.
The allowed characters are: a-z,A-Z,0-9,_.
Python and Dependency Restrictions
To ensure compatibility and stability, it is mandatory to use Python 3.8 when registering a feature set with a Koalas transformation. Additionally, ensure that
cloudpickle
version is locked to2.2.1
.
import qwak.feature_store.feature_sets.batch as batch
from qwak.feature_store.feature_sets.read_policies import ReadPolicy
from qwak.feature_store.feature_sets.transformations import KoalasTransformation
@batch.feature_set(
name="user-features",
key="user",
data_sources={"snowflake_transactions_table": ReadPolicy.TimeFrame(days=30)},
timestamp_column_name="date_created"
)
@batch.scheduling(cron_expression="0 8 * * *")
def transform():
def amount_stats(kdf_dict):
kdf = kdf_dict['snowflake_transactions_table']
agg_kdf = kdf.groupby('user').agg({'amount': ['avg', 'sum']})
agg_kdf.columns = ['_'.join(column) for column in kdf.columns.values]
return agg_kdf
return KoalasTransformation(function=amount_stats)
Function scope and dependencies
Koalas function scope and variables must be defined under the transform function, as shown in the code snippet above.
At runtime, only pandas and koalas and python native library, are available.
Logging
We support the default Python logger, which you can import from the standard python logging library.
Supported Spark column types
IntegerType, LongType, StringType, DoubleType, DecimalType, FloatType, BooleanType, TimestampType, DateType, ArrayType.
ArrayType
column may include any of the above column types, except for another ArrayType
column.
Batch cannot insert rows with an older timestamp than the current oldest timestamp in the offline feature store.
Each batch must produce a timestamp equal to or larger than the last batch.
Updated 3 months ago