User Guide > Process Steps and Components > Queue Step > Kafka Micro-batch Queue
Was this helpful?
Kafka Micro-batch Queue
Note:  For general information about Queue Steps, see Queue Step.
Kafka is a distributed, partitioned, and replicated commit log service. Kafka maintains feeds of messages in categories called topics. The processes that publish messages to a Kafka topic are called producers. The processes that subscribe to topics and process the feed of published messages are called consumers. Kafka is run as a cluster comprised of one or more servers. Each server is called a broker. The producers send messages over the network to the Kafka cluster, which in turn sends them up to consumers.
The Kafka writer provides the ability to send data to Kafka from batch or real-time invoked processes without the expectation of handling big data volumes. Kafka provides a client Producer API for the user to write data to Kafka. The Producer API version used is 2.2.1.
The following features are supported:
Reading messages from Kafka topic: The GetMessage action of the component and the associated action properties such as Group ID, Topic, and so on is used to read the messages from the Kafka topic. The messages read from the topic will be inserted into the body of the DJMessage object and the JSON file (if specified in File Name property). The JSON format or structure is as shown.
where:
recordCount: Represents number of messages/records fetched from the Kafka topic
topic: Name of the topic from where the messages are fetched.
records: Records array contains message objects. Each message object consists of key, offset, partition, timestamp, value of the message fetched from Kafka topic.
Writing messages to Kafka topic: The PutMessage action of the component is used to write the message to a topic in a Kafka queue. The messages to be written must be attached to the body of the DJMessage or must be provided in the JSON file (specified in File Name property). If the JSON file is specified, then this file will be used to write the messages to the Kafka topic. Otherwise, the body or contents of the DJMessage object will be used.
JSON File or contents (body) of the DJMessage object must be in the same format as shown in the preceding image. The only property required is Value, which contains the message to be written to Kafka Topic. The other propertied are not required. The structure is as shown.
Kafka Micro-batch Queue Properties
Property
Default Value
Description
Bootstrap Servers
 
List of host or port pairs to use for establishing the initial connection to the Kafka cluster.
The format is host1:port1,host2:port2.
Supported Actions
Action
Description
Connect
Establishes a connection to the server.
Disconnect
Disconnects from the server.
GetMessage
Reads a message object from the queue.
PutMessage
Writes a message object into the queue.
Supported Action Parameters
Action
Parameter
Description
GetMessage
Message
The message that must be read.
PutMessage
Message
The message that must be written.
Supported Action Properties
Action
Property
Description
GetMessage
Group ID
Consumer group ID to which the consumer belongs.
Topic
Kafka message topic name from where the messages must be read.
Partition Key
Partition key in the Topic from where the messages must be read. If you do not know this, leave it as blank.
Poll Duration
Maximum time to block in milliseconds. This is the time required to reach the specified Kafka Server. It blocks the process execution until the fetch request reaches the Kafka Server and records are fetched. The value should not be greater than Long.MAX_VALUE milliseconds. Default value is 5000.
Note:  Long.MAX_VALUE is equal to 2^64 minus 1, which is 9,223,372,036,854,775,807 milliseconds.
Max Poll Records
Maximum number of records to read. Default value is 1000.
Auto Offset Reset
The offset to start reading messages from when a Group ID is specified, for which there is no committed offset with Kafka Server. The options are:
earliest (default): Starts reading messages from the beginning of the queue.
latest: Starts reading messages from the end of the queue (if the queue is not receiving any messages, the records or messages fetched will always be 0 in this case).
none: Throws an exception if no previous offset is found for the given Group ID.
Enable Auto Commit
If set to TRUE, then the latest offset of messages fetched will be committed at the interval specified by the Auto Commit Interval. Default is TRUE.
Auto Commit Interval
Auto commit the offset of the last read message or record to the Kafka Server (in milliseconds). Default is 5000.
Key Deserializer
Format in which the bytes returned by the server for Key is deserialized. Default value is String Deserializer.
Value Deserializer
Format in which the bytes returned by the value for Key should be deserialized. The options are:
String Deserializer
Kafka Avro Deserializer
JSON Deserializer
Default value is String Deserializer.
Schema Registry URL
Specify this option only if you are using Confluent Stream Platform as Kafka Server and the selected Value Deserializer is Kafka Avro Deserializer.
File Name
Filename with the path, where the messages pulled from Kafka Queue must be populated. A JSON file having format mentioned in the description above.
PutMessage
Topic
Kafka message topic name to where the messages must be sent or written.
Partition Key
Partition key in the Topic to where the messages must be sent.
Acks
Number of acknowledgments the producer requires the leader to have received before considering a request complete.
The options are:
0: Producer does not wait for any acknowledgment from the server.
1: Leader writes the record to its local log but responds without awaiting full acknowledgement from all followers.
all: Leader waits for the full set of in-sync replicas to acknowledge the record.
Default value is 0.
Buffer Memory
Total bytes of memory the producer can use to buffer records waiting to be sent to the server. Default value is 33554432
Retries
If the value is greater than zero, the client resends any record when the previous send has failed with a potentially transient error. Default value is 1
Batch Size
Producer attempts to batch records together into fewer requests whenever multiple records are being sent to the same partition. Default value is 16384.
Key Serializer
Format in which the key of the message to be written to the server must be serialized. Default value is String Deserializer
Value Serializer
Format in which the message written to the server must be serialized. The options are:
String Deserializer
JSON Deserializer
Default value is String Deserializer.
File Name
Filename with path from where the messages must be read. A JSON file having format mentioned in the description above.
 
Error Conditions
The following table describes error codes generated while sending a message to Kafka.
Error Condition
Code
Description
No error
0
The status is OK. There is no error. The information is returned successfully.
Missing license
46
Component Kafka micro-batch component is not licensed. The component requires Actian DataConnect professional license. Contact your Actian sales representative to obtain the licensing information.
Missing broker list
19
Error detected when establishing a connection. The Bootstrap Servers property is required.
Error during connect
19
Exception 'xxx' occurred while attempting to connect to Kafka server 'xxx'.
Missing topic
33
Error detected when sending or receiving a message to or from a topic. A topic is required.
Error when sending a message
8
Exception 'xxx' occurred while attempting to put message to a topic 'xxx'.
Missing Group ID
33
Error in GetMessage. Group ID is required
Missing Schema Registry URL
33
Error in GetMessage. Schema Registry URL is required when using 'Kafka Avro Deserializer' as 'Value Deserializer'.
Note:  For more information about Kafka, see the Kafka documentation available at http://kafka.apache.org.
Last modified date: 02/09/2024