Polling and record acknowledgment
- Last Updated: January 17, 2024
- 2 minute read
- OpenEdge
- Version 12.8
- Documentation
At its simplest, reading and processing records from a consumer consists of an infinite loop of polling the consumer for a record, obtaining the record, and optionally acknowledging the record. This example shows the basic code needed for reading and processing records:
|
Polling
The Poll() method retrieves a record from a
given topic. You supply a timeout value to Poll(),
which is the maximum amount of time (in milliseconds) to wait for a record before
returning. If a record is available immediately, the Poll() method returns immediately.
Poll() does not respond to CTRL+C. This is
because Poll() calls into librdkafka which is an
external library. If you need to be responsive to CTRL+C
handling, avoid the use of very long timeouts when calling
Poll().Record acknowledgment
When reading and processing records, it is important for an Apache® Kafka® cluster to know where
in the stream a client is, so that in case of any failures, polling can resume in
the right location. By default, the client automatically acknowledges records when
Poll() is called. Auto acknowledgment relies
on the consumer calling Poll(), processing a
record, and then calling Poll() again. The last
record retrieved from Poll() is queued for
acknowledgment back to the broker.
SetConsumerOption() method to set the "enable.auto.commit" property to "false", or use the strongly typed method, SetEnableAutoCommit(false):
|
CommitOffset() method for
the record you want to commit:
|
CommitOffset() notifies the broker that the
application has completed processing the record. The broker stores the offset
associated with the consumer group assigned to the consumer client. By storing the
offset with the broker, the client can recover processing at the next record within
the topic if the client needs to be restarted. Note that you may still manually
commit (by calling CommitOffset()), even if using auto
acknowledgment.Offset reset policy
'smallest','earliest'—automatically reset the offset to the smallest offset'largest','latest'—automatically reset the offset to the largest offset'error'—trigger an error
SetConsumerOption() to set the "auto.offset.reset" property, or use the strongly typed method,
AutoOffsetReset(), on the consumer
builder.
|