Streaming Data Sources

Kafka Source

from qwak.feature_store.data_sources import KafkaSource

kafka_source = KafkaSource(name="sample_source",
                             description="Sample Source",
                             bootstrap_servers="broker.foo.bar.com:9094, broker2.foo.bar.com:9094",
                             subscribe="sample_topic",
                             deserialization=deserializer)

the acceptable parameters for KafkaSource are:

parameter

type

description

default value

bootstrap_servers

str

comma-separated sequence of host:port entries

This parameter is mandatory

deserialization

Deserializer

Deserializer to use

This parameter is mandatory

secret_configs

Dict[str, str]

Configurations that will be injected to the reader, where the value will be resolved from a JFrog ML Secret with the same name, that is, k:v will be resolved to k:get_secret(V) before using this for credentials, see other mechanisms (SASL/SCRAM) that are supported out-of-the-box

{}

passthrough_configs

Dict[str, str]

Configurations that will be injected to the reader w/o resolving to a JFrog ML Secret. DO NOT PLACE CREDENTIALS HERE!

{}

subscribe

str

comma separated list of 1 or more topics

No default value, exactly one {subscribe, assign, subscribe_pattern} is to be set

assign

str

JSON string, where each key is a topic name and the value is an array of partition numbers to subscribe to

No default value, exactly one {subscribe, assign, subscribe_pattern} is to be set

subscribe_pattern

str

Java regex that matches the topics to read from

No default value, exactly one {subscribe, assign, subscribe_pattern} is to be set

description

str

Description of the source

empty string

name

str

name of the source. this is the View Name with which this data source will appear in the Transformation definition (see below)

This parameter is mandatory

Deserialization

The Kafka streaming data source currently supports 2 types of message deserializer:

Generic Deserializer

📘

Generic Deserializer

  • Supports AVRO and JSON formats.
  • Assumes the message data is stored under value field
  • compatible data types are in accordance to spark data types

Using JSON

When using JSON the schema is in the Spark-proprietary JSON schema definition (NOT JSON Schema):

{
  "type": "struct",
  "fields": [
    {
      "metadata": {},
      "name": "timestamp",
      "nullable": True,
      "type": "string"
    },
    {
      "metadata": {},
      "name": "key",
      "nullable": True,
      "type": "integer"
    },
    {
      "metadata": {},
      "name": "value",
      "nullable": True,
      "type": "integer"
    }
  ]
}
from qwak.feature_store.data_sources import KafkaSource, MessageFormat, GenericDeserializer

deserializer = GenericDeserializer(
    message_format=MessageFormat.JSON, schema=<spark_json_schema_definition>
)

kafka_source = KafkaSource(name="sample_source",
                             description="Sample Source",
                             bootstrap_servers="broker.foo.bar.com:9094, broker2.foo.bar.com:9094",
                             subscribe="sample_topic",
                             deserialization=deserializer)

🚧

Behavior for Invalid Messages

When using GenericDeserializer for JSON, a message can be invalid when:

  • The value of the message in Kafka is not a UTF8 string.
  • The value of the message in Kafka is a UTF8 string but is not a valid JSON string
  • The value of the message in Kafka is a valid JSON string but does not follow the schema in one or more of its fields.

When a message is invalid it might cause an entire record to have null values, or just specific fields of the record to have a null value. Invalid messages will always create a record in the Data Source and will fail silently.

Using Avro

When using Avro the schema is an Avro schema :

from qwak.feature_store.data_sources import KafkaSource, MessageFormat, GenericDeserializer

deserializer = GenericDeserializer(
    message_format=MessageFormat.AVRO, schema=<avro_schema>
)

kafka_source = KafkaSource(name="sample_source",
                             description="Sample Source",
                             bootstrap_servers="broker.foo.bar.com:9094, broker2.foo.bar.com:9094",
                             subscribe="sample_topic",
                             deserialization=deserializer)

🚧

Behavior for Invalid Messages

When working with Avro it is important to validate the input to your topic properly. When using GenericDeserializer for Avro, invalid Avro messages can cause unexpected results. Unlike the behavior for JSON - some failures are not silent and may cause the entire Feature Set ingestion to fail. Even when the failure is silent it may cause bad data.

Custom Deserializer

📘

Custom Deserializer

Specifies how messages should be deserialized - in our case, the messages were in JSON format, and contained 3 fields: timestamp, full_name and address and were stored in the value field.
When specifying a deserializer, any arbitrary python function that accepts a PySpark DataFrame and returns a DataFrame can be specified, under several conditions:

  1. Row-Level transformations only
  2. Must not return an input that is detached from the original DataFrame (e.g., do not use the rdd, do not create a new DataFrame etc.) - this will break the streaming graph.
  3. The schema of the input DataFrame will always be the same, regardless of any other kafka configuration, see table below.
from qwak.feature_store.data_sources import KafkaSource, CustomDeserializer
from pyspark.sql.functions import *
from pyspark.sql.types import *


def deser_function(df):
    schema = StructType(
        [
            StructField("timestamp", TimestampType()),
            StructField("full_name", IntegerType()),
            StructField("address", IntegerType()),
        ]
    )
    deserialized = df.select(
        from_json(col("value").cast(StringType()), schema).alias("data")
    ).select(col("data.*"))

    return deserialized


deserializer = CustomDeserializer(f=deser_function)


kafka_source = KafkaSource(name="sample_source",
                             description="Sample Source",
                             bootstrap_servers="broker.foo.bar.com:9094, broker2.foo.bar.com:9094",
                             subscribe="sample_topic",
                             deserialization=deserializer)

Builtin Columns when accessing a Kafka Topic:

Column NameType
keybinary
valuebinary
topicstring
partitionint
offsetlong
timestamptimestamp
timestampTypeint
headers (optional)array