Kafka Connect

Setup Kafka Connect from Ambari

When installing Kafka, ensure to select the appropriate nodes for installing the Kafka broker and connect services.

Kafka Connect Setup with Kerberos, SSL, and Ranger

Since Kafka is already configured with SSL, Kerberos, and Ranger, the underlying topics maintain these settings during Kafka connect operations. Below are the configurations for the Advanced kafka-connect-distributed section in Kafka's CONFIGS.

Bash
Copy

After configuring the above settings, restart the Kafka service (including broker and Connect). Once the restart is completed, it will start to operate in Kerberos and SSL mode.

Kafka Connector Use Cases

Acceldata ODP Kafka Connect offers support for the following connectors, which have been included in the this release:

  1. Pubsub Connector
  2. MongoDB Connector
  3. JDBC Connector
  4. S3 Connector
  5. Amazon Kinesis Firehose Connector

The following table describes the supported connector type and library versions:

Connector NameSupported TypeLibrary Version
PubsubSource & Sink1.2.0
MongoDBSource & Sink1.6.1
JDBCSource & Sink6.1.2
S3Sink Only2.6.0
KinesisSink Only0.0.8

Pubsub Connector

Note The following steps provides a detailed procedure on setting up the Pubsub Sink Connector for Kafka Connect.

  1. Configure the connector:

Prepare your configuration JSON for the Pubsub Sink Connector. Here's a basic example of what your JSON might look like:

JSON
Copy
  1. Save the Configuration:

Save the above JSON configuration to a file, for instance connect-kafka-2-pubsub-demo.json. Before deploying the connector, review the configuration details in your JSON file to ensure they are correct and complete. You can display the contents of your configuration file with the following command:

Bash
Copy
JSON
Copy
  1. Deploy the Connector:

Use the following CURL command to submit your connector configuration to the Kafka Connect service:

Bash
Copy

Upon successful submission of the connector configuration, the sample response for the above POST call is as follows:

JSON
Copy
  1. Verify Connector Deployment:

After submitting the configuration, check the status of the connector to ensure it is running:

Bash
Copy
  1. Manage the Pubsub Topic:

If not already created, set up your Google Cloud Pubsub topic using the Google Cloud CLI:

Bash
Copy
  1. Create a Pubsub Subscription:

Create a subscription to the topic for message consumption:

Bash
Copy
  1. List Available Connectors:

To see a list of all active connectors:

Bash
Copy
  1. Fetch Connector Configuration:

To retrieve the specific configuration of a connector:

Bash
Copy

If you encounter any errors in the status response, address these issues before proceeding. Check the connector status and configuration for any discrepancies or required adjustments.

Sample Test Procedure for Kafka-Pubsub Connector

Prerequisites: Ensure that the cluster is Kerberos and SSL enabled.

  1. Configure Kafka Producer

Before starting the Kafka producer, it's necessary to prepare the SSL configuration. Here is an example of the required producer configuration file:

File: client-ssl.properties

Bash
Copy

Display the file contents using:

Bash
Copy
  1. Start the Kafka Console Producer:

Use the following command to launch the Kafka Console Producer, which sends messages to the specified topic:

Bash
Copy
  1. Send Data:

Enter messages directly into the producer console to send data.

Bash
Copy
  1. Start the Kafka Console Consumer:

To consume the messages sent by the producer, use the Kafka Console Consumer:

Bash
Copy
  1. Verify Output:

The output in the consumer should match the messages sent:

Bash
Copy
  1. Check Pubsub Subscription:

To pull messages from the associated Pubsub subscription, execute:

Bash
Copy

Response:

Bash
Copy

Pubsub UI

The Pubsub connector also supports functioning as a source connector.

MongoDB Connector

Setting Up the MongoDB Source Connector for Kafka Connect.

Prerequisites: Ensure that MongoDB is configured on a 3-node cluster in distributed mode.

  1. Configure MongoDB connector

Prepare your JSON configuration for the MongoDB Source Connector. This configuration will define how MongoDB connects to Kafka. Here's an example of what your JSON configuration might look like:

File: mongodb-2-kafka-01.json

JSON
Copy

View the file content with:

Bash
Copy
  1. Deploy the MongoDB connector

Submit the above configuration to Kafka Connect using the following CURL command:

Bash
Copy

Upon successful submission of the connector configuration, the sample response for the above POST call is as follows:

JSON
Copy
  1. Verify Connector Deployment

After submitting the configuration, check the status and details of the connector to ensure it is correctly configured and running:

Bash
Copy

Response:

JSON
Copy
  1. Interact with MongoDB

Log into MongoDB and insert data into the configured collection:

Logging into MongoDB:

Bash
Copy

Insert Data into MongoDB:

SQL
Copy

Response:

JSON
Copy
  1. Consume Data from Kafka

Use the Kafka Console Consumer to retrieve the records sent from MongoDB to Kafka:

Bash
Copy

Expected Output: The consumer will display JSON-formatted records that reflect the inserted MongoDB data.

Bash
Copy

The MongoDB connector also supports functioning as a sink connector, allowing Kafka data to be written back into MongoDB.

JDBC Connector

Setting Up the JDBC Sink Connector for Kafka Connect

Prerequisite: Before starting with the JDBC Connector, ensure that the appropriate JDBC driver (e.g., <MYSQL/POSTGRES/ORACLE>-connector-java.jar) is copied into the kafka/libs directory.

  1. Configure the JDBC Sink Connector

Prepare your JSON configuration for the JDBC Sink Connector. This configuration will specify how data is published from Kafka to your database. Here's an example of what your JSON configuration might look like for MySQL:

File: jdbc-kafka-2-mysql-sink-test1801.json

JSON
Copy

View the file content using:

Bash
Copy
  1. Deploy the JDBC Sink Connector

Submit the above configuration to Kafka Connect using the following CURL command:

Bash
Copy
  1. Verify Connector Deployment

Check the status of the connector to ensure it is correctly configured and running:

Bash
Copy

Sample Response:

Bash
Copy
  1. Data Ingestion from Kafka to MySQL

Ensure the MySQL table orders2 is created and ready to receive data:

Verify table existence:

SQL
Copy

Start the Kafka Producer for the orders2 topic:

Bash
Copy

Ingest structured data:

JSON
Copy

Verify data within MySQL:

SQL
Copy

Insert a few more records from the Kafka producer:

JSON
Copy
JSON
Copy

Verify data within MySQL:

SQL
Copy

Expected Result in MySQL:

SQL
Copy

The JDBC connector also supports functioning as a source connector, which can be utilized to bring data from databases into Kafka.

S3 Connector

Setting up the S3 Sink Connector for Kafka Connect.

The following provides clear steps for setting up and testing the S3 Sink Connector, ensuring users can successfully integrate their Kafka streams into Amazon S3 for data storage and processing tasks.

  1. Configure the S3 Sink Connector

Prepare your JSON configuration for the S3 Sink Connector. This configuration will specify how data is published from Kafka to your S3 bucket. Here's an example of what your JSON configuration might look like:

File: s3-sink-1902-03.json

JSON
Copy

View the file content using:

Bash
Copy
  1. Deploy the S3 Sink Connector

Submit the above configuration to Kafka Connect using the following CURL command:

Bash
Copy

Sample Response:

JSON
Copy
  1. Verify Connector Deployment

Check the status of the connector to ensure it is correctly configured and running:

Bash
Copy

Sample Response:

JSON
Copy
  1. Publish Data to the S3 Bucket

Start the Kafka producer to send data to the topic configured for the S3 connector:

Bash
Copy

Input sample records into the producer console:

plaintext
Copy
  1. Verify Data Presence in S3 Bucket

Check the S3 bucket to confirm that the files have been successfully uploaded:

Bash
Copy

You should see files like:

plaintext
Copy

Since the data is compressed (gzip) and converted to byte arrays, it cannot be directly read from the S3 files.

Also, the S3 connector does not support functioning as a source connector.

This is only for JSON/String convertor. Avro is not supported.

Amazon Kinesis Firehose Connector

Setting up the Amazon Kinesis Firehose Sink Connector for Kafka Connect.

The following provides the required steps for setting up and testing the Amazon Kinesis Firehose Sink Connector, ensuring users can successfully integrate their Kafka streams into Amazon S3 for data storage and processing tasks via Kinesis Firehose.

  1. Configure the Kinesis Firehose Sink Connector

Prepare your JSON configuration for the Kinesis Firehose Sink Connector. This configuration will specify how data is published from Kafka to Amazon Kinesis Firehose, which then stores the data in Amazon S3. Here's an example of what your JSON configuration might look like:

File: kafka-2-kinesis-sink-test-2901-02.json

JSON
Copy

View the file content using:

Bash
Copy
  1. Deploy the Kinesis Firehose Sink Connector

Submit the above configuration to Kafka Connect using the following CURL command:

Bash
Copy
  1. Verify Connector Deployment

Check the status of the connector to ensure it is correctly configured and running:

Bash
Copy

Sample response:

JSON
Copy
  1. List All Active Connectors

To see a list of all active connectors in your Kafka Connect cluster, use the following CURL command:

Bash
Copy
  1. View Connector Properties

To review the detailed configuration and properties of the Kinesis Firehose Sink Connector, you can fetch this information by running:

Bash
Copy
  1. Publish Data to Kinesis Firehose

Start the Kafka console producer to send data to the topic configured for the Kinesis Firehose connector:

Bash
Copy

Input sample records into the producer console:

plaintext
Copy
  1. Verify Data in Amazon S3

Monitor the Kinesis Firehose dashboard to observe incoming bytes and their delivery to S3. Verify the data has been stored in S3 by accessing the corresponding S3 bucket:

Bash
Copy

Download and view the data to confirm the integrity and correctness of the records:

Bash
Copy

The following code block displays the sample data saved in these files. To view and verify the data, you must download the file.

plaintext
Copy

The Kinesis Firehose connector does not support functioning as a source connector.

Type to search, ESC to discard
Type to search, ESC to discard
Type to search, ESC to discard
  Last updated