This topic describes the steps and code necessary for consuming records (messages) from an Apache® Kafka® cluster. Consumer configuration is managed by using the SetConsumerOption() method to set values for Kafka properties. This is similar to the approach for a Kafka producer, described in Construct a generic client producer using property name-value pairs.

These are the main steps for constructing a Kafka consumer:
  1. Use the OpenEdge.Messaging API.
  2. Create a consumer builder.
  3. Configure consumer builder properties using the SetConsumerOption() method.
  4. Identify topic(s) to consume.
  5. Build the consumer.
  6. Read and process records.
    1. Poll the consumer.
    2. Read a record.
    3. Acknowledge the record.
    4. Process the record.
  7. Handle any errors.
  8. Clean up.

A breakdown of each step is provided next, followed by a complete code example.

Use the OpenEdge.Messaging API

The OpenEdge.Messaging API provides a common set of messaging-focused interfaces. With this API, you can instantiate a message consumer and consume messages.

using OpenEdge.Messaging.*.

Create a consumer builder

To create a Kafka ConsumerBuilder, call the Create() method with the "progress-kafka" argument, to get the Progress-provided implementation of the Kafka client.

var ConsumerBuilder cb.
...
cb = ConsumerBuilder:Create("progress-kafka").

Configure consumer properties using the SetConsumerOption() method

SetConsumerOption() is a method in the OpenEdge.Messaging.ConsumerBuilder class which sets a Kafka consumer configuration property using a name-value pair. Here is the syntax:

cb:SetConsumerOption("prop-name", "value").
prop-name
The name of a Kafka consumer property.
value
The value of the property.
Note: A link to Kafka configuration properties can be found in the librdkafka documentation. Properties marked with "high importance" are the ones you are most likely to set.
Note: For information on how to configure authentication with client consumers, see Configure SSL connections with Apache Kafka.

The following example code uses SetConsumerOption() to set some important consumer properties:

// Kafka requires at least one bootstrap server host and port.
cb:SetConsumerOption("bootstrap.servers", "localhost:9092").

// Explicitly disable auto commit so commits can be controlled within the application.
cb:SetConsumerOption("enable.auto.commit", "false").

// Identify the consumer group. The consumer group allows multiple clients to coordinate
//  the consumption of multiple topics and partitions.
cb:SetConsumerOption("group.id", "my.consumer.group").

// Specify whether the consumer group should automatically be deleted when
//  the consumer is garbage collected.
cb:SetConsumerOption("auto.delete.group", "true").

// Configure the consumer's deserializer.
cb:SetConsumerOption("value.deserializer", "OpenEdge.Messaging.JsonDeserializer").
 
// Set the consumer starting position.
cb:SetConsumerOption("auto.offset.reset", "latest").

The bootstrap.servers property takes a comma-delimited list of servers in the Kafka cluster. Bootstrap.servers is required.

The enable.auto.commit property controls when offsets are committed. The default (true) automatically commits offsets in the background. Setting the property to "false" allows the application to control when commits occur.

The group.id property identifies the consumer group. This property is required.

The auto.delete.group property is an OpenEdge property that specifies whether the consumer group should automatically be deleted when the consumer is garbage collected. The default value is "false".

The value.deserializer property specifies the deserializer to use to convert from bytes into the body of a message. In the example above, "OpenEdge.Messaging.JsonDeserializer" is specified. For more information on deserialization, see Consumer deserialization.

The auto.offset.reset property specifies the action to take when there is no initial offset in the offset store or the desired offset is out of range. A value of "latest" sets the starting position to the most recent (latest) message as the starting point.

Identify topic(s) to consume

Use the AddSubscription( ) method to identify one or more topics to consume:
cb:AddSubscription("mytopic").

Build the consumer

Use the Build() method to build the consumer:
var IConsumer consumer.
...
consumer = cb:Build().

Read and process records

Use a loop to consume records indefinitely. Within the loop, call Poll() to retrieve a record. Acknowledge receipt of the record by calling CommitOffset(). If the consumer is configured for auto acknowledgment (the default), then you do not need to call CommitOffset(); the messaging framework automatically acknowledges the previously retrieved message the next time Poll() is called.

Call the Body property of the consumer record to get the body of the message. Then process the message according to the needs of the application.

var IConsumerRecord record.
var JsonObject messageBody.
...
repeat while true:
    // request a record, waiting up to 1 second for some records to be available
    record = consumer:Poll(1000).
         
    if valid-object(record) then do:
             
        // acknowledge the message so the client can resume where it leaves off
        // the next time it is started
        consumer:CommitOffset(record).
             
        messageBody = cast(record:Body, JsonObject).
             
        message messageBody:GetCharacter("state").
    end.
         
end.

Handle any errors

catch err as Progress.Lang.Error :
    message err:GetMessage(1) view-as alert-box.
end catch.

For more information on handling errors, see Exception handling for messaging errors.

Clean up

You can delete the consumer when finished with it but this is not required. If not explicitly deleted, the object will be garbage collected.

finally:
    delete object consumer no-error.
end. 

Complete example

This is the complete code example for the steps above:

/**
    This example shows how to consume a stream of messages. The messages
    themselves are JSON objects, and the example uses the JSONDeserializer
    class to convert the network messages to JSON objects that can be used
    by the application. 
*/
 
 
block-level on error undo, throw.
 
using OpenEdge.Messaging.ConsumerBuilder from propath.
using OpenEdge.Messaging.IConsumer from propath.
using OpenEdge.Messaging.IConsumerRecord from propath.
using Progress.Json.ObjectModel.JsonConstruct from propath.
using Progress.Json.ObjectModel.JsonObject from propath.
 
var ConsumerBuilder cb.
var IConsumer consumer.
var IConsumerRecord record.
var JsonObject messageBody.
 
cb = ConsumerBuilder:Create("progress-kafka").
 
// Kafka requires at least one bootstrap server host and port.
cb:SetConsumerOption("bootstrap.servers", "localhost:9092").

// Explicitly disable auto commit so it can be controlled within the application.
cb:SetConsumerOption("enable.auto.commit", "false").

// Identify the consumer group. The consumer group allows multiple clients to
//  coordinate the consumption of multiple topics and partitions
cb:SetConsumerOption("group.id", "my.consumer.group").

// Specify whether the consumer group should automatically be deleted when
//  the consumer is garbage collected.
cb:SetConsumerOption("auto.delete.group", "true").

// Configure the consumer's deserializer in order to convert MEMPTR values from
//  the network messages to JSON objects.
cb:SetConsumerOption("value.deserializer", "OpenEdge.Messaging.JSONDeserializer").

// Set the consumer starting position to the most recent message
cb:SetConsumerOption("auto.offset.reset", "latest").
     
// identify one or more topics to consume
cb:AddSubscription("mytopic").
     
// build the consumer
consumer = cb:Build().
          
// loop forever receiving and processing records.
repeat while true:
    // request a record, waiting up to 1 second for some records to be available
    record = consumer:Poll(1000).
         
    if valid-object(record) then do:
           
        // acknowledge the message so the client can resume where it leaves off
        // the next time it is started
        consumer:CommitOffset(record).
             
        messageBody = cast(record:Body, JsonObject).
             
        message messageBody:GetCharacter("state").
    end.
         
end.
     
 
catch err as Progress.Lang.Error :
    message err:GetMessage(1) view-as alert-box.
end catch.

finally:
    delete object consumer no-error.
end.