Batch Data Sources

Batch data sources allow you to configure connections to data-at-rest sources of data.

To define a batch data source, create a configuration object that connects to the raw data source.

Supported batch data sources:

  • Snowflake
  • BigQuery
  • S3 - Parquet, CSV
  • MySQL
  • Redshift
  • Postgres
  • MongoDB
  • Vertica

Batch data sources share three common parameter:

  • name: A unique identifier of the data source used to address it from a feature set object, may contain only characters, numbers and _.
  • description: A general description.
  • date_created_column: Used to filter the data by the batch's start time/end time. date_created_column must be present in the database. This column must hold the timestamp which represents each records time.

🚧

date_created_column

There must be a guarantee that the date_created_column is monotonically increasing.

Registering a new data source

When registering a batch data source, the Qwak System will try to validate it, meaning it will try to fetch a sample to verify that the system can query the data source.

Additionally, Batch data sources support the following validation function:

def get_sample(self, number_of_rows: int = 10) -> DataFrame:

Usage example:

from qwak.feature_store.sources.data_sources import ParquetSource 

parquet_source = ParquetSource(
    name='parquet_source',
    description='a parquet source description',
    date_created_column='date_created',
    path="s3://bucket-name/data.parquet"
)

pandas_df = parquet_source.get_sample()

When invoking this function the Qwak System will validate the data source before returning a Pandas DataFrame, meaning that if an error occurred while trying to fetch a sample, the system indicate at which stage it failed, for example it can fail:

  • When connecting to the specified bucket.
  • If the date_created_column is not of the right type or doesn't exist.

Snowflake

In order to create a Snowflake connection, before creating a connector make sure you have the following:

  1. Snowflake User (Read-Only access required)
  2. Connectivity between Qwak environment and Snowflake host
    There are two distinct ways to use the Snowflake connector:
  3. Providing table.
from qwak.feature_store.sources.data_sources import SnowflakeSource

snowflake_source = SnowflakeSource(
    name='snowflake_source',
    description='a snowflake source description',
    date_created_column='insert_date_column',
    host='<SnowflakeAddress/DNS:port>',
    username_secret_name='qwak_secret_snowflake_user', # use secret service
    password_secret_name='qwak_secret_snowflake_password', # use secret service
    database='db_name',
    schema='schema_name',
    warehouse='data_warehouse_name',
    table='snowflake_table'
)
  1. Providing query.
from qwak.feature_store.sources.data_sources import SnowflakeSource

snowflake_source = SnowflakeSource(
    name='snowflake_source',
    description='a snowflake source description',
    date_created_column='insert_date_column',
    host='<SnowflakeAddress/DNS:port>',
    username_secret_name='qwak_secret_snowflake_user', # use secret service
    password_secret_name='qwak_secret_snowflake_password', # use secret service
    database='db_name',
    schema='schema_name',
    warehouse='data_warehouse_name',
    query='select feature1, feature2 from snowflake_table'
)

BigQuery

To access a BigQuery source, please download the credentials.json file from GCP to your the local file system.

Permissions

The following permissions must be applied to the provided credentials in the credentials.json file.

bigquery.tables.create
bigquery.tables.getData  
bigquery.tables.get
bigquery.readsessions.*  
bigquery.jobs.create

Uploading Credentials

Once you've downloaded credentials.json, encode it with base64 and set it as a Qwak secret using the Qwak Secret Service.

import json
import base64
from qwak.secret_service.client import SecretServiceGrpcClient

with open('/path/of/credentials/credentials.json', 'r') as f:
    creds = json.load(f)

creds64 = base64.b64encode(json.dumps(creds).encode('utf-8')).decode('utf-8')

secrets_service = SecretServiceGrpcClient()
secrets_service.set_secret(name='qwak_secret_big_query_creds', value=creds64)

Connecting to BigQuery

There are two distinct ways to use the BigQuery connector:

  1. Providing dataset and table.
from qwak.feature_store.sources.data_sources import BigquerySource

some_bigquery_source = BigquerySource(
    name='big_query_source',
    description='a bigquery source description',
    date_created_column='date_created',
    credentials_secret_name='qwak_secret_big_query_creds',
    dataset='dataset_name',
    table='table_name',
    project='project_id',
  	materialization_project='materialization_project_name'
    parent_project='parent_project',
    views_enabled=False
)
  1. Providing sql.
big_query_source = BigquerySource(
    name='big_query',
    description='a big query source description',
    date_created_column='date_created',
    credentials_secret_name='bigquerycred',
    project='project_id',
    sql="""SELECT l.id as id, 
          SUM(l.feature1) as feature1, 
          SUM(r.feature2) as feature2,
          MAX(l.date_created) as date_created,
          FROM `project_id.dataset.left` AS l
          JOIN `project_id.dataset.right` as r
          ON r.id = l.id 
          GROUP BY id""",
    parent_project='',
    views_enabled=False
)

MongoDB

from qwak.feature_store.sources.data_sources import MongoSource 

mongo_source = MongoSource(
    name='mongo_source',
    description='a mongo source description',
    date_created_column='insert_date_column',
    hosts='<MongoAddress/DNS:Port>',
    username_secret_name='qwak_secret_mongodb_user', #uses the Qwak Secret Service
    password_secret_name='qwak_secret_mongodb_pass', #uses the Qwak Secret Service
    database='db_name',
    collection='collection_name',
    connection_params='authSource=admin'
)

S3 - Filesystem Configuration

AWS S3 filesystem data sources support explicit credentials for a custom bucket (default: qwak bucket).
To access more of your data from a different S3 bucket, use this optional configuration.
Once creating the relevant secrets using the Qwak-CLI you can use:

from qwak.feature_store.sources.data_sources import ParquetSource, AwsS3FileSystemConfiguration

parquet_source = ParquetSource(
    name='my_source',
    description='some s3 data source',
    date_created_column='DATE_CREATED',
  	"""
    A path can be:
    	1. Specific parquet file
      2. A directory containing parquet files
      3. A directory with subdirectories containing parquet files
      	(in which case, the path should be 's3://mybucket/dir*')
    """
    path='s3://mybucket/dir',
    filesystem_configuration=AwsS3FileSystemConfiguration(
        access_key_secret_name='mybucket_access_key',
        secret_key_secret_name='mybucket_secret_key',
        bucket='mybucket'
    )
)

Redshift

In order to connect to Redshift source, you will need to grant access either using AWS Access Key & Secret Key or using IAM Role.

from qwak.feature_store.sources.data_sources import RedshiftSource

redshift_source = RedshiftSource(
    name='my_source',
    date_created_column='DATE_CREATED',
    description='Some Redshift Source',
    url="company-redshift-cluster.xyz.us-east-1.redshift.amazonaws.com:5439/DBName",
    db_table='my_table',
    iam_role_arn='arn:aws:iam::123456789:role/assumed_role_redshift',
    db_user='dbuser_name',
)

S3 - Reading a CSV File

CSV access works like reading a Parquet file from S3. We either specify the AWS access keys as environment variables or access a public object.

from qwak.feature_store.sources.data_sources import CsvSource

csv_source = CsvSource(
    name='csv_source',
    description='a csv source description',
    date_created_column='date_created',
    path="s3://bucket-name/data.csv",
    quote_character='"',
    escape_character='"'
)

MySQL

from qwak.feature_store.sources.data_sources import MysqlSource

mysql_source = MysqlSource(
    name='mysql_source',
    description='a mysql source description',
    date_created_column='date_created',
    username_secret_name='qwak_secret_mysql_user', # uses the Qwak Secret Service
    password_secret_name='qwak_secret_mysql_pass', # uses the Qwak Secret Service
    url='<MysqlAddress/DNS:Port>',
    db_table='db.table_name',  # i.e database1.table1
    query='base query when fetching data from mysql'  # Must choose either db_table or query
)

Postgres

from qwak.feature_store.sources.data_sources import PostgresqlSource

postgres_source = PostgresqlSource(
    name='postgresql_source',
    description='a postgres source description',
    date_created_column='date_created',
    username_secret_name='qwak_secret_postgres_user', # uses the Qwak Secret Service
    password_secret_name='qwak_secret_postgres_pass', # uses the Qwak Secret Service
    url='<PostgresqlAddress/DNS:Port/DBName>',
    db_table='schema.table_name',  # default schema: public
    query='base query when fetching data from postgres'  # Must choose either db_table or query
)

Vertica

from qwak.feature_store.sources.data_sources import VerticaSource

vertica_source = VerticaSource(
    name='vertica_source',
    description='a vertica source description',
    date_created_column='date_created',
    username_secret_name='qwak_secret_vertica_user', # uses the Qwak Secret Service
    password_secret_name='qwak_secret_vertica_pass', # uses the Qwak Secret Service
    host='VerticaHost without :port suffix',
    port=5444,
    database='MyVerticaDatabase',
    schema='MyVerticaSchema e.g: public',
    table='table_name'
)

AWS Athena

The Athena source is used to connect Qwak to Amazon Athena, allowing users to query and ingest data seamlessly

from qwak.feature_store.sources.data_sources import AthenaSource
from qwak.feature_store.sources.source_authentication import AwsAssumeRoleAuthentication
from qwak.feature_store.sources.time_partition_columns import DatePartitionColumns

athena_source = AthenaSource(
    name='my_athena_source',
    description='my Athena source description',
    date_created_column='date_created',
    aws_region='us-east-1',
    s3_output_location='s3://some-athena-queries-bucket/',
    workgroup='some-workgroup',
    query='SELECT * FROM "db"."table"',
    aws_authentication=AwsAssumeRoleAuthentication(role_arn='some_role_arn'),
    time_partition_columns=DatePartitionColumns(date_column_name='date_pt', date_format='%Y%m%d'),
)

πŸ“˜

The IAM Role configuration

Configure the following IAM policy

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "LakeFormationDataAccess",
            "Effect": "Allow",
            "Action": [
                "lakeformation:GetDataAccess"
            ],
            "Resource": "*"
        }
    ]
}

{
		"Sid": "QueryResultsBucketAccess",
    "Effect": "Allow",
    "Action": [
        "s3:GetBucketLocation",
        "s3:GetObject",
        "s3:ListBucket",
        "s3:ListBucketMultipartUploads",
        "s3:ListMultipartUploadParts",
        "s3:AbortMultipartUpload",
        "s3:PutObject",
        "s3:PutBucketPublicAccessBlock"
    ],
    "Resource": [
        "arn:aws:s3:::<s3_output_location>"
    ]
}

{
			"Sid": "AllowAthenaCreateStatement",
			"Effect": "Allow",
			"Action": "athena:CreatePreparedStatement",
			"Resource": "*"
}

Add the following trust policy to the Qwak IAM Role

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Principal": {
                "AWS": "arn:aws:iam::<ACCOUNT-ID>:root"
            },
            "Action": "sts:AssumeRole",
            "Condition": {
                "ArnLike": {
                    "aws:PrincipalArn": "arn:aws:iam::<ACCOUNT-ID>:role/qwak-eks-base*"
                }
            }
        }
    ]
}

🚧

Define date partition columns is optional

time_partition_columns: TimePartitionColumns

  • Description: Define date partition columns correlated with date_created_column.
  • Optional: Yes (Highly recommended)
  • Options:
    • DatePartitionColumns
      • Fields:
        • date_column_name: str: Mandatory
        • date_format: str: Mandatory
      • Example:
        from qwak.feature_store.sources.time_partition_columns import DatePartitionColumns
        time_partition_columns = DatePartitionColumns(date_column_name='date_pt', date_format='%Y%m%d')
        
    • TimeFragmentedPartitionColumns
      • Fields:
        • year_partition_column: YearFragmentColumn: Mandatory
        • month_partition_column: MonthFragmentColumn: Optional (Must be set if day_partition_column is set)
        • day_partition_column: DayFragmentColumn: Optional
      • Examples:
        • For year=2022/month=01/day=05:
          from qwak.feature_store.sources.time_partition_columns import (
              ColumnRepresentation,
              TimeFragmentedPartitionColumns,
              YearFragmentColumn,
              MonthFragmentColumn,
              DayFragmentColumn,
          )
          time_partition_columns = TimeFragmentedPartitionColumns(
              YearFragmentColumn("year", ColumnRepresentation.NumericColumnRepresentation),
              MonthFragmentColumn("month", ColumnRepresentation.NumericColumnRepresentation),
              DayFragmentColumn("day", ColumnRepresentation.NumericColumnRepresentation),
          )
          
        • For year=2022/month=January/day=5:
          from qwak.feature_store.sources.time_partition_columns import (
              ColumnRepresentation,
              DayFragmentColumn,
              MonthFragmentColumn,
              TimeFragmentedPartitionColumns,
              YearFragmentColumn,
          )
          time_partition_columns = TimeFragmentedPartitionColumns(
              YearFragmentColumn("year", ColumnRepresentation.NumericColumnRepresentation),
              MonthFragmentColumn("month", ColumnRepresentation.TextualColumnRepresentation),
              DayFragmentColumn("day", ColumnRepresentation.NumericColumnRepresentation),
          )
          

Default timestamp format for date_created_column should be yyyy-MM-dd'T'HH:mm:ss, optionally with [.SSS][XXX]. For example 2020-01-01T00:00:00