The following example is based on Kafka 3 runtime arguments and may differ for Kafka 2.
Start Kafka, Zookeeper, and Pinot
Since you already have Kafka, Pinot, and Zookeeper installed, ensure they are running.
It is recommended to create new topics for each service to do testing.
Before running any Pinot commands set, set Java 11 on the CLI and export other required configurations.
export JAVA_HOME=/usr/lib/jvm/java-11-openjdk-11.0.25.0.9-2.el8.x86_64export PATH=$JAVA_HOME/bin:$PATHexport JAVA_OPTS="-Xms1G -Xmx2G"export LOG_ROOT=/var/log/pinotCreate different tables and schemas.
Navigate to Kafka 2 Home Directory
- Navigate to the Kafka 2 home directory using the following command.
cd /usr/odp/3.2.3.3-3/kafkaIf your cluster is Kerberized, complete the following steps. If not, you can skip the below steps and proceed to the next step.
cat conf/kafka-env.sh- Copy and run the below command from kafka-env.sh
export KAFKA_OPTS="-Djava.security.auth.login.config=/etc/kafka/conf/kafka_jaas.conf"
It is recommended to update the hostname and topic name.
Create Kafka Topic
- Create a Kafka topic named events for ingestion.
For Kafka 3, use the port 6669.
bin/kafka-topics.sh --create --topic events --bootstrap-server {hostname}or{IP}:6667 --partitions 1 --replication-factor 1
- Verify that the topic was created.
bin/kafka-topics.sh --list --bootstrap-server {hostname}or{IP}:6667
Generate Sample Data
- Generate sample data using the below python script.
Create a script (datagen.py) to generate JSON records.
import datetimeimport uuidimport randomimport jsonwhile True: ts = int(datetime.datetime.now().timestamp() * 1000) id = str(uuid.uuid4()) count = random.randint(0, 1000) print(json.dumps({"ts": ts, "uuid": id, "count": count}))- Run the script and pipe its output to Kafka.
Before running the below data generation command, create the following file.
[root@pinotmpackrheltest kafka]# cat client-sasl.properties security.protocol=SASL_PLAINTEXT
python datagen.py | bin/kafka-console-producer.sh --topic events --bootstrap-server {hostname}or{IP}:6667 --producer.config client-sasl.propertiesThis command continuously generates the data until you Interrupt the command using ctrl + C.


Verify Kafka Messages
Consume and verify the messages.
bin/kafka-console-consumer.sh --topic events --bootstrap-server {hostname}or{IP}:6667 --consumer.config client-sasl.properties --from-beginning
Define Pinot Schema
Create a file /tmp/pinot/schema-stream.json.
{ "schemaName": "events", "dimensionFieldSpecs": [ { "name": "uuid", "dataType": "STRING" } ], "metricFieldSpecs": [ { "name": "count", "dataType": "INT" } ], "dateTimeFieldSpecs": [{ "name": "ts", "dataType": "TIMESTAMP", "format": "1:MILLISECONDS:EPOCH", "granularity": "1:MILLISECONDS" }]}Define Pinot Table Configuration
Create a file /tmp/pinot/table-config-stream.json.
Update the following in table-config-stream.json
- Table and Schema names
stream.kafka.broker.listbased on your broker list.
{ "tableName": "events", "tableType": "REALTIME", "segmentsConfig": { "timeColumnName": "ts", "schemaName": "events", "replicasPerPartition": "1" }, "tenants": {}, "tableIndexConfig": { "loadMode": "MMAP", "streamConfigs": { "streamType": "kafka", "stream.kafka.consumer.type": "lowlevel", "stream.kafka.topic.name": "events", "stream.kafka.decoder.class.name": "org.apache.pinot.plugin.stream.kafka.KafkaJSONMessageDecoder", "stream.kafka.consumer.factory.class.name": "org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory", "stream.kafka.broker.list": "{hostname}or{IP}:6667, "realtime.segment.flush.threshold.rows": "0", "realtime.segment.flush.threshold.time": "24h", "realtime.segment.flush.threshold.segment.size": "50M", "stream.kafka.consumer.prop.auto.offset.reset": "smallest" } }, "metadata": { "customConfigs": {} }}
Create Pinot Schema and Table
Run the following command:
bin/pinot-admin.sh AddTable -schemaFile /tmp/pinot/schema-stream.json -tableConfigFile /tmp/pinot/table-config-stream.json -controllerHost {hostname}or{IP} -controllerPort 9000 -execQuery Ingested Data
Open Pinot UI at http://localhost:9000/#/query and run:
SELECT * FROM events LIMIT 10;This must return the events that were ingested into Kafka and processed by Pinot.
