At its simplest, reading and processing records from a consumer consists of an infinite loop of polling the consumer for a record, obtaining the record, and optionally acknowledging the record. This example shows the basic code needed for reading and processing records:

var IConsumerRecord record.
var JsonObject messageBody.
...
repeat while true:
    // request a record, waiting up to 1 second for some records to be available
    record = consumer:Poll(1000).
         
    if valid-object(record) then do:
             
        // acknowledge the message so the client can resume where it leaves off
        // the next time it is started
        consumer:CommitOffset(record).
             
        messageBody = cast(record:Body, JsonObject).
             
        message messageBody:GetCharacter("state").
    end.
         
end.

Polling

The Poll() method retrieves a record from a given topic. You supply a timeout value to Poll(), which is the maximum amount of time (in milliseconds) to wait for a record before returning. If a record is available immediately, the Poll() method returns immediately.

Note: Poll() does not respond to CTRL+C. This is because Poll() calls into librdkafka which is an external library. If you need to be responsive to CTRL+C handling, avoid the use of very long timeouts when calling Poll().

Record acknowledgment

When reading and processing records, it is important for an Apache® Kafka® cluster to know where in the stream a client is, so that in case of any failures, polling can resume in the right location. By default, the client automatically acknowledges records when Poll() is called. Auto acknowledgment relies on the consumer calling Poll(), processing a record, and then calling Poll() again. The last record retrieved from Poll() is queued for acknowledgment back to the broker.

If desired, you can override the default behavior and acknowledge records manually instead. You do this by first disabling auto acknowledgment on the consumer builder. You can either use the SetConsumerOption() method to set the "enable.auto.commit" property to "false", or use the strongly typed method, SetEnableAutoCommit(false):
cb:SetConsumerOption("enable.auto.commit", "false").
// or
cb:SetEnableAutoCommit(false). 
Later, within the loop that polls for records, you can acknowledge the record by invoking the CommitOffset() method for the record you want to commit:
consumer:CommitOffset(record).
CommitOffset() notifies the broker that the application has completed processing the record. The broker stores the offset associated with the consumer group assigned to the consumer client. By storing the offset with the broker, the client can recover processing at the next record within the topic if the client needs to be restarted. Note that you may still manually commit (by calling CommitOffset()), even if using auto acknowledgment.

Offset reset policy

The offset reset policy determines the action to take when there is no initial offset in the offset store (for example, when a consumer group is first created), or when the desired offset is out of range. Record consumption typically starts either at the earliest offset or the latest offset. Possible values for the offset reset policy are:
  • 'smallest','earliest'—automatically reset the offset to the smallest offset
  • 'largest','latest'—automatically reset the offset to the largest offset
  • 'error'—trigger an error
To set the offset reset policy, you can either use SetConsumerOption() to set the "auto.offset.reset" property, or use the strongly typed method, AutoOffsetReset(), on the consumer builder.
cb:SetConsumerOption("auto.offset.reset", "latest").
// or
cb:AutoOffsetReset(AutoOffsetReset:Latest).