Streaming Data Sources
Kafka Source
from qwak.feature_store.data_sources import KafkaSource
kafka_source = KafkaSource(name="sample_source",
description="Sample Source",
bootstrap_servers="broker.foo.bar.com:9094, broker2.foo.bar.com:9094",
subscribe="sample_topic",
deserialization=deserializer)
the acceptable parameters for KafkaSource are:
parameter | type | description | default value |
---|---|---|---|
bootstrap_servers | str | comma-separated sequence of host:port entries | This parameter is mandatory |
deserialization | Deserializer | Deserializer to use | This parameter is mandatory |
secret_configs | Dict[str, str] | Configurations that will be injected to the reader, where the value will be resolved from a JFrog ML Secret with the same name, that is, k:v will be resolved to k:get_secret(V) before using this for credentials, see other mechanisms (SASL/SCRAM) that are supported out-of-the-box | {} |
passthrough_configs | Dict[str, str] | Configurations that will be injected to the reader w/o resolving to a JFrog ML Secret. DO NOT PLACE CREDENTIALS HERE! | {} |
subscribe | str | comma separated list of 1 or more topics | No default value, exactly one {subscribe, assign, subscribe_pattern} is to be set |
assign | str | JSON string, where each key is a topic name and the value is an array of partition numbers to subscribe to | No default value, exactly one {subscribe, assign, subscribe_pattern} is to be set |
subscribe_pattern | str | Java regex that matches the topics to read from | No default value, exactly one {subscribe, assign, subscribe_pattern} is to be set |
description | str | Description of the source | empty string |
name | str | name of the source. this is the View Name with which this data source will appear in the Transformation definition (see below) | This parameter is mandatory |
Deserialization
The Kafka streaming data source currently supports 2 types of message deserializer:
Generic Deserializer
Generic Deserializer
- Supports
AVRO
andJSON
formats.- Assumes the message data is stored under
value
field- compatible data types are in accordance to spark data types
Using JSON
When using JSON the schema is in the Spark-proprietary JSON schema definition (NOT JSON Schema):
{
"type": "struct",
"fields": [
{
"metadata": {},
"name": "timestamp",
"nullable": True,
"type": "string"
},
{
"metadata": {},
"name": "key",
"nullable": True,
"type": "integer"
},
{
"metadata": {},
"name": "value",
"nullable": True,
"type": "integer"
}
]
}
from qwak.feature_store.data_sources import KafkaSource, MessageFormat, GenericDeserializer
deserializer = GenericDeserializer(
message_format=MessageFormat.JSON, schema=<spark_json_schema_definition>
)
kafka_source = KafkaSource(name="sample_source",
description="Sample Source",
bootstrap_servers="broker.foo.bar.com:9094, broker2.foo.bar.com:9094",
subscribe="sample_topic",
deserialization=deserializer)
Behavior for Invalid Messages
When using
GenericDeserializer
for JSON, a message can be invalid when:
- The value of the message in Kafka is not a UTF8 string.
- The value of the message in Kafka is a UTF8 string but is not a valid JSON string
- The value of the message in Kafka is a valid JSON string but does not follow the schema in one or more of its fields.
When a message is invalid it might cause an entire record to have null values, or just specific fields of the record to have a null value. Invalid messages will always create a record in the Data Source and will fail silently.
Using Avro
When using Avro the schema is an Avro schema :
from qwak.feature_store.data_sources import KafkaSource, MessageFormat, GenericDeserializer
deserializer = GenericDeserializer(
message_format=MessageFormat.AVRO, schema=<avro_schema>
)
kafka_source = KafkaSource(name="sample_source",
description="Sample Source",
bootstrap_servers="broker.foo.bar.com:9094, broker2.foo.bar.com:9094",
subscribe="sample_topic",
deserialization=deserializer)
Behavior for Invalid Messages
When working with Avro it is important to validate the input to your topic properly. When using
GenericDeserializer
for Avro, invalid Avro messages can cause unexpected results. Unlike the behavior for JSON - some failures are not silent and may cause the entire Feature Set ingestion to fail. Even when the failure is silent it may cause bad data.
Custom Deserializer
Custom Deserializer
Specifies how messages should be deserialized - in our case, the messages were in JSON format, and contained 3 fields:
timestamp
,full_name
andaddress
and were stored in thevalue
field.
When specifying a deserializer, any arbitrary python function that accepts a PySparkDataFrame
and returns aDataFrame
can be specified, under several conditions:
- Row-Level transformations only
- Must not return an input that is detached from the original
DataFrame
(e.g., do not use the rdd, do not create a newDataFrame
etc.) - this will break the streaming graph.- The schema of the input
DataFrame
will always be the same, regardless of any other kafka configuration, see table below.
from qwak.feature_store.data_sources import KafkaSource, CustomDeserializer
from pyspark.sql.functions import *
from pyspark.sql.types import *
def deser_function(df):
schema = StructType(
[
StructField("timestamp", TimestampType()),
StructField("full_name", IntegerType()),
StructField("address", IntegerType()),
]
)
deserialized = df.select(
from_json(col("value").cast(StringType()), schema).alias("data")
).select(col("data.*"))
return deserialized
deserializer = CustomDeserializer(f=deser_function)
kafka_source = KafkaSource(name="sample_source",
description="Sample Source",
bootstrap_servers="broker.foo.bar.com:9094, broker2.foo.bar.com:9094",
subscribe="sample_topic",
deserialization=deserializer)
Builtin Columns when accessing a Kafka Topic:
Column Name | Type |
---|---|
key | binary |
value | binary |
topic | string |
partition | int |
offset | long |
timestamp | timestamp |
timestampType | int |
headers (optional) | array |
Updated 24 days ago