This topic describes the steps and code necessary for consuming records (messages) from an Apache® Kafka® cluster. Consumer builder configuration is managed by using the SetConsumerOption() method to set values for Kafka properties. This is similar to the approach for a Kafka producer builder, described in Construct a client producer using strongly-typed methods.

These are the main steps for constructing a Kafka consumer:
  1. Use the OpenEdge.Messaging API.
  2. Create a consumer builder.
  3. Configure consumer builder properties using strongly-typed methods.
  4. Identify topic(s) to consume.
  5. Build the consumer.
  6. Read and process records.
    1. Poll the consumer.
    2. Read a record.
    3. Acknowledge the record.
    4. Process the record.
  7. Handle any errors.
  8. Clean up.

A breakdown of each step is provided next, followed by a complete code example.

Use the OpenEdge.Messaging API

The OpenEdge.Messaging API provides a common set of messaging-focused interfaces. With this API, you can instantiate a message consumer and consume messages.

OpenEdge.Messaging.Kafka.KafkaConsumerBuilder is the consumer builder class for the OpenEdge Kafka consumer implementation. This class inherits from OpenEdge.Messaging.ConsumerBuilder and extends the default set of setters with a number of Kafka specific properties.

using OpenEdge.Messaging.*.
using OpenEdge.Messaging.Kafka.KafkaConsumerBuilder.

Create a consumer builder

To create a Kafka ConsumerBuilder, call the Create() method with the "progress-kafka" argument, to get the Progress-provided implementation of the Kafka client. Then cast to KafkaConsumerBuilder.

var KafkaConsumerBuilder cb.
...
cb = cast(ConsumerBuilder:Create("progress-kafka"), KafkaConsumerBuilder).

Configure consumer properties using the SetConsumerOption() method

Once the consumer builder is created, properties can be set using methods that accept strongly-typed values. Method names correspond to Kafka consumer builder property names. For example, to set the bootstrap.servers property, you call SetBootstrapServers(). See the OpenEdge.Messaging.ConsumerBuilder class documentation in the OpenEdge ABL API Reference for the properties you can set using strongly-typed methods.

Note: Not all Kafka properties have strongly-typed methods in OpenEdge. If a method is not available for a property you need, use the generic SetConsumerOption() method, where you can supply a name-value pair for any Kafka consumer builder property. See Construct a generic client consumer using property name-value pairs for more information.

This example code uses the strongly-typed methods of the ConsumerBuilder class to set properties for the consumer builder:

cb:SetBootstrapServers("localhost:9092").
cb:SetEnableAutoCommit(false).   
cb:SetGroupId("my.consumer.group").
cb:SetAutoDeleteConsumerGroup(true).
cb:SetClientId("my.consumer.client").
cb:SetBodyDeserializer(new OpenEdge.Messaging.JsonDeserializer()).
cb:SetAutoOffsetReset(AutoOffsetReset:Latest).

The SetBootstrapServers() method takes a comma-delimited list of servers in the Kafka cluster. This setting is required.

The SetEnableAutoCommit() method allows you to control when offsets are committed. The default (true) is to automatically commit offsets in the background. Setting the value to false allows the application to control when commits occur.

The SetGroupId() method identifies the consumer group. The consumer group allows multiple clients to coordinate the consumption of multiple topics and partitions. This setting is required.

The SetAutoDeleteConsumerGroup() method is a setting specific to OpenEdge, that specifies whether the consumer group should automatically be deleted when the consumer is garbage collected. The default value is false.

The SetClientId() method sets the client identifier for the Kafka consumer.

The SetBodyDeserializer() method specifies the deserializer to use to convert from bytes into the body of a message. In the example above, OpenEdge.Messaging.JsonDeserializer is specified. For more information on deserialization, see Consumer deserialization.

The SetAutoOffsetReset() method specifies the action to take when there is no initial offset in the offset store or the desired offset is out of range. A value of Latest sets the starting position to the most recent (latest) message as the starting point.

Identify topic(s) to consume

Use the AddSubscription() method to identify one or more topics to consume:
cb:AddSubscription("mytopic").

Build the consumer

Use the Build() method to build the consumer:
var IConsumer consumer.
...
consumer = cb:Build().

Read and process records

Use a loop to consume records indefinitely. Within the loop, call Poll() to retrieve a record. Acknowledge receipt of the record by calling CommitOffset(). If the consumer is configured for auto acknowledgment (the default), then you do not need to call CommitOffset(); the messaging framework automatically acknowledges the previously retrieved message the next time Poll() is called.

Call the Body property of the consumer record to get the body of the message. Then process the message according to the needs of the application.

var IConsumerRecord record.
var JsonObject messageBody.
...
repeat while true:
    // request a record, waiting up to 1 second for some records to be available
    record = consumer:Poll(1000).
         
    if valid-object(record) then do:
             
        // acknowledge the message so the client can resume where it leaves off
        // the next time it is started
        consumer:CommitOffset(record).
             
        messageBody = cast(record:Body, JsonObject).
             
        message messageBody:GetCharacter("state").
    end.
         
end.

Handle any errors

catch err as Progress.Lang.Error :
    message err:GetMessage(1) view-as alert-box.
end catch.

For more information on handling errors in the consumer, see Exception handling for messaging errors.

Clean up

You can delete the consumer when finished with it but this is not required. If not explicitly deleted, the object will be garbage collected.

finally:
    delete object consumer no-error.
end. 

Complete example

This is the complete code example:

/**
    This example shows how to consume a stream of messages. The messages
    themselves are JSON objects, and the example uses the JsonDeserializer
    class to convert the network messages to JSON objects that can be used
    by the application.
*/
 
block-level on error undo, throw.
 
using OpenEdge.Messaging.IConsumer from propath.
using OpenEdge.Messaging.IConsumerRecord from propath.
using Progress.Json.ObjectModel.JsonConstruct from propath.
using Progress.Json.ObjectModel.JsonObject from propath.
using OpenEdge.Messaging.Kafka.KafkaConsumerBuilder from propath.
using OpenEdge.Messaging.ConsumerBuilder from propath.
using OpenEdge.Messaging.Kafka.AutoOffsetReset from propath.
 
var KafkaConsumerBuilder cb.     
var IConsumer consumer.
var IConsumerRecord record.
var JsonObject messageBody.
 
cb = cast(ConsumerBuilder:Create("progress-kafka"), KafkaConsumerBuilder).
 
// Kafka requires at least one bootstrap server host and port.
cb:SetBootstrapServers("localhost:9092").
    
// enable auto commit which will automatically acknowledge each received message.
cb:SetEnableAutoCommit(true).
    
// Identify the consumer group. The consumer group allows multiple clients to
//  coordinate the consumption of multiple topics and partitions
cb:SetGroupId("my.consumer.group").

// Specifies whether the consumer group should automatically be deleted when
//  the consumer is garbage collected. 
cb:SetAutoDeleteConsumerGroup(true).
    
// Set the client id. This allows group rejoin to work efficiently. Each client
//  in the group needs to have a unique id.
cb:SetClientId("my.consumer.client").
    
// Configure the consumer's deserializer in order to convert MEMPTR values from
//  the network messages to JSON objects.
cb:SetBodyDeserializer(new OpenEdge.Messaging.JsonDeserializer()).

// Set the consumer starting position to the most recent (latest) message
cb:SetAutoOffsetReset(AutoOffsetReset:Latest).

// identify one or more topics to consume
cb:AddSubscription("mytopic").
     
// build the consumer
consumer = cb:Build().
     
     
// loop forever receiving and processing records.
repeat while true:
    // request a record, waiting up to 1 second for some records to be available
    record = consumer:Poll(1000).
         
    if valid-object(record) then do:                          
        messageBody = cast(record:Body, JsonObject).             
        message messageBody:GetCharacter("state").
    end.
         
end.
      
catch err as Progress.Lang.Error :
    message err:GetMessage(1) view-as alert-box.
end catch.

finally:
    delete object consumer no-error.
end.