An Apache® Kafka® record may have headers associated with it. Headers allow you to provide metadata about the Kafka record, without having to add extra information to the name-value pair of the record itself. This topic contains example code for producing and consuming records with headers.

Producer-side

This example demonstrates how to produce records with multiple headers. The AddHeader() method is called on the RecordBuilder object to add a header key-value pair. Multiple headers are allowed and can be created by calling AddHeader() repeatedly.

using OpenEdge.Messaging.*.
using OpenEdge.Messaging.Kafka.*.

block-level on error undo, throw.

var RecordBuilder recordBuilder.
var KafkaProducerBuilder pb.
var IProducer producer.
var IProducerRecord record.
var ISendResponse response.
    
pb = cast(ProducerBuilder:Create("progress-kafka"), KafkaProducerBuilder).
pb:SetBootstrapServers("localhost:9092").
pb:SetBodySerializer(new StringSerializer()).

producer = pb:Build().

recordBuilder = producer:RecordBuilder.

recordBuilder:SetTopicName("headerstream").
    
recordBuilder:SetBody("this is my message body").
    
// add some random headers. The values get turned into memptr encoded as UTF-8 strings.
recordBuilder:AddHeader("foo", "bar").
recordBuilder:AddHeader("foo1", "bar1").
// headers are cumulative...and with kafka multiple headers can have the same key
recordBuilder:AddHeader("foo2", "bar2").
     
record = recordBuilder:Build().
response = producer:Send(record).

// send the same message 3 times
response = producer:Send(record).
response = producer:Send(record).
response = producer:Send(record).
    
producer:Flush(10000).
    
repeat while response:Completed eq false: 
    producer:Flush(100).
end.
        
catch err as Progress.Lang.Error :
    message err:GetMessage(1) view-as alert-box.
end catch.

Consumer-side

This example demonstrates how to consume a stream of records, where the records have headers. An iterator is used to iterate over the headers, since there may be multiple headers per record. For more information on iterators, see Progress.Collections.IIterator<T> interface in the ABL Reference.

using OpenEdge.Messaging.ConsumerBuilder from propath.
using OpenEdge.Messaging.IConsumer from propath.
using OpenEdge.Messaging.IConsumerRecord from propath.
using OpenEdge.Messaging.JsonDeserializer from propath.
using OpenEdge.Messaging.Kafka.KafkaConsumerConfig from propath.
using OpenEdge.Messaging.StringDeserializer from propath.
using Progress.Collections.IIterator from propath.
using OpenEdge.Messaging.RecordHeader from propath.

block-level on error undo, throw.

var ConsumerBuilder cb.
var IConsumer consumer.
var IConsumerRecord record.
var Progress.Json.ObjectModel.JsonConstruct messageBody.
var character msg.
var IIterator<RecordHeader> headerIter.
var RecordHeader recHeader.
var OpenEdge.Core.Memptr headerValue.
var longchar lcHeaderValue.
var int64 endTime.

cb = ConsumerBuilder:Create("progress-kafka").

cb:SetConsumerOption("bootstrap.servers", "localhost:9092").   
cb:SetConsumerOption(KafkaConsumerConfig:AutoOffsetReset, "beginning").     
cb:SetConsumerOption(KafkaConsumerConfig:EnableAutoCommit, "false").
cb:SetConsumerOption(KafkaConsumerConfig:GroupId, "my.header.group" + string(mtime)).  
cb:SetConsumerOption(KafkaConsumerConfig:ClientId, "my.header.client" + string(mtime)).    
cb:SetBodyDeserializer(new StringDeserializer()).
    
cb:AddSubscription("headerstream").
    
consumer = cb:Build().
message "Consuming messages".

// call process events so display can update.
process events.

// set a timeout of 20 seconds.
endTime = mtime + 10000.

// loop, receiving and processing records.
repeat while mtime < endTime:
    
    // request a record, waiting for some records to be available 
    record = consumer:Poll(500) no-error.
        
    if valid-object(record) then do:
            
        headerIter = record:Headers:GetIterator().
        
        // Iterate over the headers
        repeat while headerIter:MoveNext():           
            if length(msg) > 0 then
                msg += ", ".
            recHeader = headerIter:Current.
            msg += recHeader:Name.
            headerValue = cast(recHeader:Value, OpenEdge.Core.Memptr).
            copy-lob headerValue:Value to lcHeadervalue.
            
            msg += "=".
            msg += string(lcHeaderValue).
        end.
        
        display msg format "x(100)" with width 110 50 down.
        msg = "".
            
        // Call process events so we can allow the screen to repaint 
        process events.
                    
    end.
       
end.

catch err as Progress.Lang.Error :
    message 
        err:GetMessage(1) 
        err:CallStack 
        view-as alert-box.
end catch.

finally:
    delete object consumer no-error.
end.