Streaming Data Sources

Kafka Source V1

The Kafka streaming data source currently supports 2 types of message deserializer:

Generic Deserializer

Deserializes AVRO/JSON messages based on given schema

Custom Deserializer

Deserializes messages based on user defined function

kafka_source = KafkaSourceV1(name="sample_source",
                             description="Sample Source",
                             bootstrap_servers="broker.foo.bar.com:9094, broker2.foo.bar.com:9094",
                             subscribe="sample_topic",
                             deserialization=deserializer)

the acceptable parameters for KafkaSourceV1 are:

parametertypedescriptiondefault value
bootstrap_serversstrcomma-separated sequence of host:port entriesThis parameter is mandatory
deserializationDeserializerDeserializer to useThis parameter is mandatory
secret_configsDict[str, str]Configurations that will be injected to the reader, where the value will be resolved from a JFrog ML Secret with the same name, that is, k:v will be resolved to k:get_secret(V)
before using this for credentials, see other mechanisms (SASL/SCRAM) that are supported out-of-the-box
{}
passthrough_configsDict[str, str]Configurations that will be injected to the reader w/o resolving to a JFrog ML Secret.
DO NOT PLACE CREDENTIALS HERE!
{}
subscribestrcomma separated list of 1 or more topicsNo default value, exactly one {subscribe, assign, subscribe_pattern} is to be set
assignstrJSON string, where each key is a topic name and the value is an array of partition numbers to subscribe toNo default value, exactly one {subscribe, assign, subscribe_pattern} is to be set
subscribe_patternstrJava regex that matches the topics to read fromNo default value, exactly one {subscribe, assign, subscribe_pattern} is to be set
descriptionstrDescription of the sourceempty string
namestrname of the source.
this is the View Name with which this data source will appear in the Transformation definition (see below)
This parameter is mandatory

Generic Deserializer

📘

Generic Deserializer

  • Supports AVRO and JSON schemas
  • Assumes the message data is stored under value field
  • compatible data types are in accordance to spark data types
{
  "type": "struct",
  "fields": [
    {
      "metadata": {},
      "name": "timestamp",
      "nullable": True,
      "type": "string"
    },
    {
      "metadata": {},
      "name": "key",
      "nullable": True,
      "type": "integer"
    },
    {
      "metadata": {},
      "name": "value",
      "nullable": True,
      "type": "integer"
    }
  ]
}
deserializer = GenericDeserializer(
    message_format=MessageFormatType.JSON, schema=<json_schema_as_string>
)

kafka_source = KafkaSourceV1(name="sample_source",
                             description="Sample Source",
                             bootstrap_servers="broker.foo.bar.com:9094, broker2.foo.bar.com:9094",
                             subscribe="sample_topic",
                             deserialization=deserializer)

Custom Deserializer

📘

Custom Deserializer

Specifies how messages should be deserialized - in our case, the messages were in JSON format, and contained 3 fields: timestamp, full_name and address and were stored in the value field.
When specifying a deserializer, any arbitrary python function that accepts a PySpark DataFrame and returns a DataFrame can be specified, under several conditions:

  1. Row-Level transformations only
  2. Must not return an input that is detached from the original DataFrame (e.g., do not use the rdd, do not create a new DataFrame etc.) - this will break the streaming graph.
  3. The schema of the input DataFrame will always be the same, regardless of any other kafka configuration, see table below.
from pyspark.sql.functions import *
from qwak.feature_store.sources.data_sources import *
from pyspark.sql.types import *


def deser_function(df):
    schema = StructType(
        [
            StructField("timestamp", TimestampType()),
            StructField("full_name", IntegerType()),
            StructField("address", IntegerType()),
        ]
    )
    deserialized = df.select(
        from_json(col("value").cast(StringType()), schema).alias("data")
    ).select(col("data.*"))

    return deserialized


deserializer = CustomDeserializer(f=deser_function)


kafka_source = KafkaSourceV1(name="sample_source",
                             description="Sample Source",
                             bootstrap_servers="broker.foo.bar.com:9094, broker2.foo.bar.com:9094",
                             subscribe="sample_topic",
                             deserialization=deserializer)

Builtin Columns when accessing a Kafka Topic:

Column NameType
keybinary
valuebinary
topicstring
partitionint
offsetlong
timestamptimestamp
timestampTypeint
headers (optional)array