Streaming Data Sources

Kafka Source V1

The Kafka streaming data source currently supports 2 types of message deserializer:

Generic Deserializer

Deserializes AVRO/JSON messages based on given schema

Custom Deserializer

Deserializes messages based on user defined function

kafka_source = KafkaSourceV1(name="sample_source",
                             description="Sample Source",
                             bootstrap_servers="broker.foo.bar.com:9094, broker2.foo.bar.com:9094",
                             subscribe="sample_topic",
                             deserialization=deserializer)

the acceptable parameters for KafkaSourceV1 are:

parameter

type

description

default value

bootstrap_servers

str

comma-separated sequence of host:port entries

This parameter is mandatory

deserialization

Deserializer

Deserializer to use

This parameter is mandatory

secret_configs

Dict[str, str]

Configurations that will be injected to the reader, where the value will be resolved from a JFrog ML Secret with the same name, that is, k:v will be resolved to k:get_secret(V)
before using this for credentials, see other mechanisms (SASL/SCRAM) that are supported out-of-the-box

{}

passthrough_configs

Dict[str, str]

Configurations that will be injected to the reader w/o resolving to a JFrog ML Secret.
DO NOT PLACE CREDENTIALS HERE!

{}

subscribe

str

comma separated list of 1 or more topics

No default value, exactly one {subscribe, assign, subscribe_pattern} is to be set

assign

str

JSON string, where each key is a topic name and the value is an array of partition numbers to subscribe to

No default value, exactly one {subscribe, assign, subscribe_pattern} is to be set

subscribe_pattern

str

Java regex that matches the topics to read from

No default value, exactly one {subscribe, assign, subscribe_pattern} is to be set

description

str

Description of the source

empty string

name

str

name of the source.
this is the View Name with which this data source will appear in the Transformation definition (see below)

This parameter is mandatory

Generic Deserializer

📘

Generic Deserializer

  • Supports AVRO and JSON schemas
  • Assumes the message data is stored under value field
  • compatible data types are in accordance to spark data types
{
  "type": "struct",
  "fields": [
    {
      "metadata": {},
      "name": "timestamp",
      "nullable": True,
      "type": "string"
    },
    {
      "metadata": {},
      "name": "key",
      "nullable": True,
      "type": "integer"
    },
    {
      "metadata": {},
      "name": "value",
      "nullable": True,
      "type": "integer"
    }
  ]
}
deserializer = GenericDeserializer(
    message_format=MessageFormatType.JSON, schema=<json_schema_as_string>
)

kafka_source = KafkaSourceV1(name="sample_source",
                             description="Sample Source",
                             bootstrap_servers="broker.foo.bar.com:9094, broker2.foo.bar.com:9094",
                             subscribe="sample_topic",
                             deserialization=deserializer)

Custom Deserializer

📘

Custom Deserializer

Specifies how messages should be deserialized - in our case, the messages were in JSON format, and contained 3 fields: timestamp, full_name and address and were stored in the value field.
When specifying a deserializer, any arbitrary python function that accepts a PySpark DataFrame and returns a DataFrame can be specified, under several conditions:

  1. Row-Level transformations only
  2. Must not return an input that is detached from the original DataFrame (e.g., do not use the rdd, do not create a new DataFrame etc.) - this will break the streaming graph.
  3. The schema of the input DataFrame will always be the same, regardless of any other kafka configuration, see table below.
from pyspark.sql.functions import *
from qwak.feature_store.sources.data_sources import *
from pyspark.sql.types import *


def deser_function(df):
    schema = StructType(
        [
            StructField("timestamp", TimestampType()),
            StructField("full_name", IntegerType()),
            StructField("address", IntegerType()),
        ]
    )
    deserialized = df.select(
        from_json(col("value").cast(StringType()), schema).alias("data")
    ).select(col("data.*"))

    return deserialized


deserializer = CustomDeserializer(f=deser_function)


kafka_source = KafkaSourceV1(name="sample_source",
                             description="Sample Source",
                             bootstrap_servers="broker.foo.bar.com:9094, broker2.foo.bar.com:9094",
                             subscribe="sample_topic",
                             deserialization=deserializer)

Builtin Columns when accessing a Kafka Topic:

Column NameType
keybinary
valuebinary
topicstring
partitionint
offsetlong
timestamptimestamp
timestampTypeint
headers (optional)array