Construct a generic client consumer using property name-value pairs
- Last Updated: July 5, 2024
- 4 minute read
- OpenEdge
- Version 12.8
- Documentation
This topic describes the steps and code necessary for consuming records
(messages) from an Apache® Kafka®
cluster. Consumer configuration is managed by using the SetConsumerOption() method to set values for Kafka properties. This is
similar to the approach for a Kafka producer, described in Construct a generic client producer using property name-value pairs.
- Use the
OpenEdge.MessagingAPI. - Create a consumer builder.
- Configure consumer builder properties using the
SetConsumerOption()method. - Identify topic(s) to consume.
- Build the consumer.
- Read and process records.
- Poll the consumer.
- Read a record.
- Acknowledge the record.
- Process the record.
- Handle any errors.
- Clean up.
A breakdown of each step is provided next, followed by a complete code example.
Use the OpenEdge.Messaging API
The OpenEdge.Messaging API provides
a common set of messaging-focused interfaces. With this API, you can instantiate a
message consumer and consume messages.
|
Create a consumer builder
To create a Kafka ConsumerBuilder,
call the Create() method with the "progress-kafka" argument, to get the
Progress-provided implementation of the Kafka client.
|
Configure consumer properties using the SetConsumerOption() method
SetConsumerOption() is a method in
the OpenEdge.Messaging.ConsumerBuilder class which
sets a Kafka consumer configuration property using a name-value pair. Here is the
syntax:
|
- prop-name
- The name of a Kafka consumer property.
- value
- The value of the property.
The following example code uses SetConsumerOption() to set some important consumer properties:
|
The bootstrap.servers property takes a
comma-delimited list of servers in the Kafka cluster. Bootstrap.servers is required.
The enable.auto.commit property controls
when offsets are committed. The default (true)
automatically commits offsets in the background. Setting the property to "false" allows the application to control when commits
occur.
The group.id property identifies the consumer group. This property
is required.
The auto.delete.group property is an
OpenEdge property that specifies whether the consumer group should automatically be
deleted when the consumer is garbage collected. The default value is "false".
The value.deserializer property specifies
the deserializer to use to convert from bytes into the body of a message. In the
example above, "OpenEdge.Messaging.JsonDeserializer" is specified. For more
information on deserialization, see Consumer deserialization.
The auto.offset.reset property specifies
the action to take when there is no initial offset in the offset store or the
desired offset is out of range. A value of "latest" sets the starting position to the most recent (latest)
message as the starting point.
Identify topic(s) to consume
AddSubscription( ) method to identify one or more topics to
consume:
|
Build the consumer
Build() method to build the consumer:
|
Read and process records
Use a loop to consume records indefinitely. Within the loop, call Poll() to retrieve a record. Acknowledge receipt of
the record by calling CommitOffset(). If the
consumer is configured for auto acknowledgment (the default), then you do not need
to call CommitOffset(); the messaging framework
automatically acknowledges the previously retrieved message the next time Poll() is called.
Call the Body property of the
consumer record to get the body of the message. Then process the message according
to the needs of the application.
|
Handle any errors
|
For more information on handling errors, see Exception handling for messaging errors.
Clean up
You can delete the consumer when finished with it but this is not required. If not explicitly deleted, the object will be garbage collected.
|
Complete example
This is the complete code example for the steps above:
|