Was this helpful?
Kafka Queue
Note:  For general information about Queue Steps, see Queue Step.
Kafka Queue 1.0.0 component connects to a Kafka server and reads a single message from a Kafka topic and writes a single message to a Kafka topic using DJMessage object. Kafka can run on a server or a group of servers (called cluster). These servers are called as brokers. Messages or records are stored in a Kafka topic. A topic is a virtual group of one or more partitions. A Kafka message contains a key-value pair. However, the format of the message may vary based on the application.
In Actian DataConnect, the PutMessage action is the producer and Getmessage action is the consumer. Producers are client applications that write messages, records, or events to Kafka topics and consumers read or subscribe to these messages.
Kafka Queue Component Properties
Property
Description
Bootstarap Servers
List of host or port pairs to use for establishing the initial connection to the Kafka cluster.
The format is host1:port1, host2:port2
Supported Actions
Property
Description
Connect
Establishes a connection with the Kafka queue server.
Disconnect
Disconnects from the Kafka queue server.
GetMessage
Reads a message from the Kafka topic using DJMessage object.
PutMessage
Writes a message to the Kafka topic using DJMessage object.
Supported Actions Parameters
Property
Parameter
Description
GetMessage
Message
Establishes a connection with the Kafka queue server.
PutMessage
Message
The message that must be written.
 
Supported Actions Properties
Property
Parameter
Description
GetMessage
Group ID
Unique string that identifies the consumer group to which the consumer belongs.
For example, if you specify Group ID as Mac-1, then Kafka broker tracks the offset of the last message read by Mac-1 group. Every time you run the process, the next message in the queue will be returned by Kafka and offset of that message will be stored by Kafka broker as the Last Message Read Offset. This offset management is done by the Kafka. If there are no messages in queue to read, then No records fetched is returned.
 
Topic
Kafka message topic name from where the messages must be read.
 
Partition Key
Partition key in the Topic from where the messages must be read. If you do not know this, leave it as blank. If Kafka topic is divided into multiple partitions, then specify the partition number from where the messages must be read.
 
Poll Duration
Maximum time to block in milliseconds. This is the time required to reach the specified Kafka Server. It blocks the process execution until the fetch request reaches the Kafka Server and records are fetched.
The value must be large considering the network latency, and the time required to reach Kafka Server from the machine on which Actian DataConnect is running.
The value should not be greater than Long.MAX_VALUE milliseconds.
Default value is 5000.
Note:  Long.MAX_VALUE is equal to 2^64 minus 1, which is 9,223,372,036,854,775,807 milliseconds.
 
Auto Offset Reset
The offset to start reading messages from when a Group ID is specified, for which there is no committed offset with Kafka Server. The options are:
earliest: Starts reading messages from the beginning of the queue.
latest: Starts reading messages from the end of the queue (if the queue is not receiving any messages, the records or messages fetched will always be 0 in this case).
none: Throws an exception if no previous offset is found for the given Group ID.
Default value is earliest.
 
Enable Auto Commit
When set to TRUE, then the latest offset of messages fetched will be committed at the interval specified by the Auto Commit Interval.
Default is TRUE.
 
Auto Commit Interval
Auto commit the offset of the last read message or record to the Kafka Server (in milliseconds).
Default is 5000.
 
Key Deserializer
Format in which the bytes returned by the server for Key is deserialized. A Kafka message consist of a key-value pair. The Key can be null. Keys present in the string format within this message can be deserialized using the String Deserializer option.
Default value is String Deserializer.
 
Value Deserializer
Format in which the bytes returned by the value for Key should be deserialized. The options are:
String Deserializer - When the value or content of the message is plain text or string, then the String Deserializer can be used to deserialize the content.
Kafka Avro Deserializer - If you use Confluent Stream Platform as Kafka Server and the structure of the message is defined using an Avro schema and is stored in a schema registry, then this option works in conjunction with "Schema Registry URL" property. Hence, it should not be empty.
JSON Deserializer - When the value or content of the message is stored in JSON format, then use JSON deserializer.
Default value is String Deserializer.
 
Schema Registry URL
URL of the Schema Registry where all the Avro Schemas are stored. Specify this option only if you are using Confluent Stream Platform as Kafka Server and the selected Value Deserializer is Kafka Avro Deserializer.
PutMessage
Topic
Kafka message topic name to where the messages must be sent or written. If the topic does not exist, a new topic with the given name will be created.
 
Partition Key
Partition key (if it exists) in the Topic to where the messages must be sent. The specified topic must be present or created with multiple partitions on Kafka server and the specified partition number must exist to where the messages must be written.
 
Acks
Number of acknowledgments the producer requires the leader to have received before considering a request complete.
The options are:
0: Producer does not wait for any acknowledgment from the server.
1: Leader writes the record to its local log but responds without awaiting full acknowledgement from all followers.
all: Leader waits for the full set of in-sync replicas to acknowledge the record.
Default value is 0.
If the acks value is set to 0 (default option), then the process runs faster compared to 1 and all as the producer (PutMessage) waits for the acknowledgment from the Kafka server.
 
Buffer Memory
Total bytes of memory the producer can use to buffer records waiting to be sent to the server.
Default value is 33554432.
 
Retries
If the value is greater than zero, the client resends any record when the previous send has failed with a potentially transient error.
Default value is 1.
 
Batch Size
Producer attempts to batch records together into fewer requests whenever multiple records are being sent to the same partition.
Default value is 16384.
 
Key Serializer
Format in which the key of the message to be written to the server must be serialized.
Default value is String Deserializer.
Use String Serializer when you use text or string as Keys of the message.
 
Value Serializer
Format in which the message written to the queue must be serialized. The options are:
String Serializer - If the string or text messages are to be written to queue, then select this option.
JSON Serializer - If the value or content of the message to be written to queue is in JSON format, then select this option.
Default value is String Serializer.
Error Conditions
The following table describes error codes generated while sending a message to Kafka.
Error Condition
Code
Description
No error
0
The status is OK. There is no error. The information is returned successfully.
Missing license
46
Component Kafka queue component is not licensed. The component requires Actian DataConnect professional license. Contact your Actian sales representative to obtain the licensing information.
Missing broker list
19
Error detected when establishing a connection. The Bootstrap Servers property is required.
Error during connect
19
Exception 'xxx' occurred while attempting to connect to Kafka server 'xxx'.
Missing topic
33
Error detected when sending or receiving a message to or from a topic. A topic is required.
Error when sending a message
8
Exception 'xxx' occurred while attempting to put message to a topic 'xxx'.
Missing Group ID
33
Error in GetMessage. Group ID is required
Missing Schema Registry URL
33
Error in GetMessage. Schema Registry URL is required when using 'Kafka Avro Deserializer' as 'Value Deserializer'.
For more information about Kafka, see the Kafka documentation available at http://kafka.apache.org.
Usage Examples for Kafka Queue Component
Following are the examples to use the Kafka Queue Component:
Create Kafka Consumer using GetMessage action
To create a Kafka Consumer using GetMessage action:
1. On the Step Canvas, add a Kakfa Queue component and name it Queue_1.
2. In the Step Properties for Queue_1, from the Action drop-down list, select GetMessage.
3. In the Parameters table, for the Message parameter, click within the Value column and from the drop-down list, select Add new DJMessage.
The Add New DJMessage Variable window is displayed.
3. Specify the DJMessage Name and click Create DJMessage.
Do not change the default selections of the other fields.
An initialization script will be added automatically in the Start step and a destroy script will be added automatically in the Stop step for this new DJMessage, if you have selected the Add the required initialize and destroy scripts in the Start and Stop steps option.
A DJMessage object is required to store the content of the message returned by Kafka queue. Without a DJMessage, the process fails validation.
4. Configure the following Property values for GetMessage action:
Group ID
Topic
Partition Key
Note:  Configure the Partition Key to fetch messages from a specific Partition Number within the Topic property. You can configure the Partition Key, only if the Topic consists of multiple partitions.
Poll Duration
Value Deserializer
Schema Registry URL
For more information about these properties, see Supported Actions Properties
5. Connect the Queue_1 step with Start and Stop step and run the process.
The message is fetched from the given Topic, its content or value will be stored in the DJMessage body and can be accessed using DJM.body.
To access the message, specify the following:
value: DJM.body
key: DJM.properties("dc_key")
offset: DJM.properties("dc_offset")
timestamp: DJM.properties("dc_timestamp")
Note:  If messages are not fetched from the Topic, then DJM.body will contain the No records fetched message.
Create Kafka Producer using PutMessage action
To create a Kafka Producer using PutMessage action:
1. In the Step Properties for the Queue_1 component, from the Action drop-down list, select PutMessage action.
2. In the Parameters table, for the Message parameter, click within the Value column and from the drop-down list, select Add new DJMessage.
The Add New DJMessage Variable window is displayed.
3. Specify the DJMessage Name and click Create DJMessage.
A DJMessage object is required to store the content of the message returned by Kafka Queue. Without a DJMessage, the process fails in validation.
4. Configure the following in the Property values for PutMessage action:
Topic
Value Serializer
For more information about Topic and Value Serializer, see Supported Actions Properties
5. Add a Script Step before the Queue_1 Step and assign the message that must be sent to Kafka Queue to DJMessage body.
6. Add the following expression in the Script Step:
DJM.body = "Message Number - 1"
If you want to write a key to the message, then specify it as dc_key of the DJM object. For example:
DJM.properties("dc_key") = "key1"
7. Link the steps as Start > Script_1 > Queue_1 > Stop.
8. If you write string messages to a Kafka topic, then the expression in Script_1 must be in the following format:
DJM.body = "Message Number -1"
DJM.properties("dc_key") = "key1"
9. Run the process.
The message in DJM.body is sent to the configured Kafka Topic.
Last modified date: 02/09/2024