Streaming Ingestion Example with Kafka 2/Kafka 3
The following example is based on Kafka 3 runtime arguments and may differ for Kafka 2.
Start Kafka, Zookeeper, and Pinot
Since you already have Kafka, Pinot, and Zookeeper installed, ensure they are running.
It is recommended to create new topics for each service to do testing.
Before running any Pinot commands set, set Java 11 on the CLI and export other required configurations.
export JAVA_HOME=/usr/lib/jvm/java-11-openjdk-11.0.25.0.9-2.el8.x86_64
export PATH=$JAVA_HOME/bin:$PATH
export JAVA_OPTS="-Xms1G -Xmx2G"
export LOG_ROOT=/var/log/pinot
Create different tables and schemas.
Navigate to Kafka 2 Home Directory
- Navigate to the Kafka 2 home directory using the following command.
cd /usr/odp/3.2.3.3-3/kafka
If your cluster is Kerberized, complete the following steps. If not, you can skip the below steps and proceed to the next step.
cat conf/kafka-env.sh
- Copy and run the below command from kafka-env.sh
export KAFKA_OPTS="-Djava.security.auth.login.config=/etc/kafka/conf/kafka_jaas.conf"

It is recommended to update the hostname and topic name.
Create Kafka Topic
- Create a Kafka topic named events for ingestion.
For Kafka 3, use the port 6669.
bin/kafka-topics.sh --create --topic events --bootstrap-server {hostname}or{IP}:6667 --partitions 1 --replication-factor 1

- Verify that the topic was created.
bin/kafka-topics.sh --list --bootstrap-server {hostname}or{IP}:6667

Generate Sample Data
- Generate sample data using the below python script.
Create a script (datagen.py
) to generate JSON records.
import datetime
import uuid
import random
import json
while True:
ts = int(datetime.datetime.now().timestamp() * 1000)
id = str(uuid.uuid4())
count = random.randint(0, 1000)
print(json.dumps({"ts": ts, "uuid": id, "count": count}))
- Run the script and pipe its output to Kafka.
Before running the below data generation command, create the following file.
[root@pinotmpackrheltest kafka]# cat client-sasl.properties
security.protocol=SASL_PLAINTEXT

python datagen.py | bin/kafka-console-producer.sh --topic events --bootstrap-server {hostname}or{IP}:6667 --producer.config client-sasl.properties
This command continuously generates the data until you Interrupt the command using ctrl + C.


Verify Kafka Messages
Consume and verify the messages.
bin/kafka-console-consumer.sh --topic events --bootstrap-server {hostname}or{IP}:6667 --consumer.config client-sasl.properties --from-beginning

Define Pinot Schema
Create a file /tmp/pinot/schema-stream.json.
{
"schemaName": "events",
"dimensionFieldSpecs": [
{ "name": "uuid", "dataType": "STRING" }
],
"metricFieldSpecs": [
{ "name": "count", "dataType": "INT" }
],
"dateTimeFieldSpecs": [{
"name": "ts",
"dataType": "TIMESTAMP",
"format": "1:MILLISECONDS:EPOCH",
"granularity": "1:MILLISECONDS"
}]
}
Define Pinot Table Configuration
Create a file /tmp/pinot/table-config-stream.json.
Update the following in table-config-stream.json
- Table and Schema names
stream.kafka.broker.list
based on your broker list.
{
"tableName": "events",
"tableType": "REALTIME",
"segmentsConfig": {
"timeColumnName": "ts",
"schemaName": "events",
"replicasPerPartition": "1"
},
"tenants": {},
"tableIndexConfig": {
"loadMode": "MMAP",
"streamConfigs": {
"streamType": "kafka",
"stream.kafka.consumer.type": "lowlevel",
"stream.kafka.topic.name": "events",
"stream.kafka.decoder.class.name": "org.apache.pinot.plugin.stream.kafka.KafkaJSONMessageDecoder",
"stream.kafka.consumer.factory.class.name": "org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory",
"stream.kafka.broker.list": "{hostname}or{IP}:6667,
"realtime.segment.flush.threshold.rows": "0",
"realtime.segment.flush.threshold.time": "24h",
"realtime.segment.flush.threshold.segment.size": "50M",
"stream.kafka.consumer.prop.auto.offset.reset": "smallest"
}
},
"metadata": {
"customConfigs": {}
}
}

Create Pinot Schema and Table
Run the following command:
bin/pinot-admin.sh AddTable -schemaFile /tmp/pinot/schema-stream.json -tableConfigFile /tmp/pinot/table-config-stream.json -controllerHost {hostname}or{IP} -controllerPort 9000 -exec
Query Ingested Data
Open Pinot UI at http://localhost:9000/#/query and run:
SELECT * FROM events LIMIT 10;
This must return the events that were ingested into Kafka and processed by Pinot.
