Construct a client consumer using strongly-typed methods
- Last Updated: July 5, 2024
- 4 minute read
- OpenEdge
- Version 12.8
- Documentation
This topic describes the steps and code necessary for consuming records
(messages) from an Apache® Kafka®
cluster. Consumer builder configuration is managed by using the SetConsumerOption() method to set values for Kafka properties. This is
similar to the approach for a Kafka producer builder, described in Construct a client producer using strongly-typed methods.
- Use the
OpenEdge.MessagingAPI. - Create a consumer builder.
- Configure consumer builder properties using strongly-typed methods.
- Identify topic(s) to consume.
- Build the consumer.
- Read and process records.
- Poll the consumer.
- Read a record.
- Acknowledge the record.
- Process the record.
- Handle any errors.
- Clean up.
A breakdown of each step is provided next, followed by a complete code example.
Use the OpenEdge.Messaging API
The OpenEdge.Messaging API provides
a common set of messaging-focused interfaces. With this API, you can instantiate a
message consumer and consume messages.
OpenEdge.Messaging.Kafka.KafkaConsumerBuilder is the consumer builder
class for the OpenEdge Kafka consumer implementation. This class inherits from
OpenEdge.Messaging.ConsumerBuilder and extends
the default set of setters with a number of Kafka specific properties.
|
Create a consumer builder
To create a Kafka ConsumerBuilder,
call the Create() method with the "progress-kafka" argument, to get the
Progress-provided implementation of the Kafka client. Then cast to KafkaConsumerBuilder.
|
Configure consumer properties using the SetConsumerOption() method
Once the consumer builder is created, properties can be set using methods that
accept strongly-typed values. Method names correspond to Kafka consumer builder
property names. For example, to set the bootstrap.servers property, you call SetBootstrapServers(). See the OpenEdge.Messaging.ConsumerBuilder class
documentation in the OpenEdge ABL API Reference for the properties you
can set using strongly-typed methods.
SetConsumerOption() method,
where you can supply a name-value pair for any Kafka consumer builder property. See
Construct a generic client consumer using property name-value pairs for more information.This example code uses the strongly-typed methods of the ConsumerBuilder class to set properties for the
consumer builder:
|
The SetBootstrapServers() method takes a
comma-delimited list of servers in the Kafka cluster. This setting is required.
The SetEnableAutoCommit() method allows you
to control when offsets are committed. The default (true) is to
automatically commit offsets in the background. Setting the value to false allows the application to control when commits
occur.
The SetGroupId() method identifies the
consumer group. The consumer group allows multiple clients to coordinate the
consumption of multiple topics and partitions. This setting is required.
The SetAutoDeleteConsumerGroup() method is
a setting specific to OpenEdge, that specifies whether the consumer group should
automatically be deleted when the consumer is garbage collected. The default value
is false.
The SetClientId() method sets the client
identifier for the Kafka consumer.
The SetBodyDeserializer() method specifies
the deserializer to use to convert from bytes into the body of a message. In the
example above, OpenEdge.Messaging.JsonDeserializer
is specified. For more information on deserialization, see Consumer deserialization.
The SetAutoOffsetReset() method specifies
the action to take when there is no initial offset in the offset store or the
desired offset is out of range. A value of Latest
sets the starting position to the most recent (latest) message as the starting
point.
Identify topic(s) to consume
AddSubscription() method to
identify one or more topics to consume:
|
Build the consumer
Build() method to build the consumer:
|
Read and process records
Use a loop to consume records indefinitely. Within the loop, call Poll() to retrieve a record. Acknowledge receipt of
the record by calling CommitOffset(). If the
consumer is configured for auto acknowledgment (the default), then you do not need
to call CommitOffset(); the messaging framework
automatically acknowledges the previously retrieved message the next time Poll() is called.
Call the Body property of the
consumer record to get the body of the message. Then process the message according
to the needs of the application.
|
Handle any errors
|
For more information on handling errors in the consumer, see Exception handling for messaging errors.
Clean up
You can delete the consumer when finished with it but this is not required. If not explicitly deleted, the object will be garbage collected.
|
Complete example
This is the complete code example:
|