Streaming Data Sources
Kafka Source V1:
Kafka Source V1:
The Kafka streaming data source currently supports 2 types of message deserializer:
* Generic Deserializer:
deserializes AVRO/JSON messages based on given schema
* Custom Deserializer
deserializes messages based on user defined function
kafka_source = KafkaSourceV1(name="sample_source",
description="Sample Source",
bootstrap_servers="broker.foo.bar.com:9094, broker2.foo.bar.com:9094",
subscribe="sample_topic",
deserialization=deserializer)
the acceptable parameters for KafkaSourceV1 are:
parameter | type | description | default value |
---|---|---|---|
bootstrap_servers | str | comma-separated sequence of host:port entries | This parameter is mandatory |
deserialization | Deserializer | Deserializer to use | This parameter is mandatory |
secret_configs | Dict[str, str] | Configurations that will be injected to the reader, where the value will be resolved from a Qwak Secret with the same name, that is, k:v will be resolved to k:get_secret(V) before using this for credentials, see other mechanisms (SASL/SCRAM) that are supported out-of-the-box | {} |
passthrough_configs | Dict[str, str] | Configurations that will be injected to the reader w/o resolving to a Qwak Secret. DO NOT PLACE CREDENTIALS HERE! | {} |
subscribe | str | comma separated list of 1 or more topics | No default value, exactly one {subscribe, assign, subscribe_pattern} is to be set |
assign | str | JSON string, where each key is a topic name and the value is an array of partition numbers to subscribe to | No default value, exactly one {subscribe, assign, subscribe_pattern} is to be set |
subscribe_pattern | str | Java regex that matches the topics to read from | No default value, exactly one {subscribe, assign, subscribe_pattern} is to be set |
description | str | Description of the source | empty string |
name | str | name of the source. this is the View Name with which this data source will appear in the Transformation definition (see below) | This parameter is mandatory |
Generic Deserializer
Generic Deserializer
- Supports
AVRO
andJSON
schemas- Assumes the message data is stored under
value
field- compatible data types are in accordance to spark data types
{
"type": "struct",
"fields": [
{
"metadata": {},
"name": "timestamp",
"nullable": True,
"type": "string"
},
{
"metadata": {},
"name": "key",
"nullable": True,
"type": "integer"
},
{
"metadata": {},
"name": "value",
"nullable": True,
"type": "integer"
}
]
}
deserializer = GenericDeserializer(
message_format=MessageFormatType.JSON, schema=<json_schema_as_string>
)
kafka_source = KafkaSourceV1(name="sample_source",
description="Sample Source",
bootstrap_servers="broker.foo.bar.com:9094, broker2.foo.bar.com:9094",
subscribe="sample_topic",
deserialization=deserializer)
Custom Deserializer
Custom Deserializer
Specifies how messages should be deserialized - in our case, the messages were in JSON format, and contained 3 fields:
timestamp
,full_name
andaddress
and were stored in thevalue
field.
When specifying a deserializer, any arbitrary python function that accepts a PySparkDataFrame
and returns aDataFrame
can be specified, under several conditions:
- Row-Level transformations only
- Must not return an input that is detached from the original
DataFrame
(e.g., do not use the rdd, do not create a newDataFrame
etc.) - this will break the streaming graph.- The schema of the input
DataFrame
will always be the same, regardless of any other kafka configuration, see table below.
from pyspark.sql.functions import *
from qwak.feature_store.sources.data_sources import *
from pyspark.sql.types import *
def deser_function(df):
schema = StructType(
[
StructField("timestamp", TimestampType()),
StructField("full_name", IntegerType()),
StructField("address", IntegerType()),
]
)
deserialized = df.select(
from_json(col("value").cast(StringType()), schema).alias("data")
).select(col("data.*"))
return deserialized
deserializer = CustomDeserializer(f=deser_function)
kafka_source = KafkaSourceV1(name="sample_source",
description="Sample Source",
bootstrap_servers="broker.foo.bar.com:9094, broker2.foo.bar.com:9094",
subscribe="sample_topic",
deserialization=deserializer)
Builtin Columns when accessing a Kafka Topic:
Column Name | Type |
---|---|
key | binary |
value | binary |
topic | string |
partition | int |
offset | long |
timestamp | timestamp |
timestampType | int |
headers (optional) | array |
Updated over 1 year ago