The following code example shows how to construct a transactional processor using a Consume → Produce message delivery pattern. In this pattern, the transactional processor consumes messages from one stream, processes the messages, and produces messages to a separate stream. The methods available from ITransactionalProducer are used to ensure that acknowledgment of consumed messages is managed within the same transaction of produced messages.

In this example, the production of messages, and the offset acknowledgment for the consumer, are handled within a transaction. This is done so that if the processor is restarted for any reason, the incoming messages can be reprocessed.

using OpenEdge.Messaging.*.
using OpenEdge.Messaging.Kafka.*.
using Progress.Json.ObjectModel.*.
 
block-level on error undo, throw.
 
var RecordBuilder recordBuilder.
var KafkaProducerBuilder pb.
var ITransactionalProducer producer.
var JsonObject msgbody.
var IConsumerRecord consumerRecord.
var IProducerRecord producerRecord.
var ConsumerBuilder cb.
var KafkaConsumerBuilder kcb.     
var IConsumer consumer.
var IConsumerRecord record.
var JsonObject messageBody.
var character consumerGroupId = "my.consumer.group".
var character bootstrapServers = "localhost:9092".
  
// Configure the consumer
kcb = cast(ConsumerBuilder:Create("progress-kafka"), KafkaConsumerBuilder).
kcb:SetBootstrapServers(bootstrapServers).
kcb:SetGroupId(consumerGroupId).
kcb:SetBodyDeserializer(new JsonDeserializer()).
  
// Set the consumer starting position to the most recent (latest) message
kcb:SetAutoOffsetReset(AutoOffsetReset:Latest).
 
// Must disable auto commit to use transactional processor
kcb:SetEnableAutoCommit(false).
 
// Must set isolation level to ReadCommitted to use transactional processor
kcb:SetIsolationLevel(IsolationLevel:ReadCommitted).
        
cb:AddSubscription("input-topic").
      
consumer = cb:Build().   

// Configure the producer     
pb = cast(ProducerBuilder:Create("progress-kafka"), KafkaProducerBuilder).
pb:SetBootstrapServers(bootstrapServers).
pb:SetBodySerializer(new JsonSerializer()).

pb:SetTransactionalId("some.unique.value").
 
// In order to use the transactional producer methods, 
//    cast the newly built producer to ITransactionalProducer
producer = cast(pb:Build(), ITransactionalProducer).
 
recordBuilder = producer:RecordBuilder. 
recordBuilder:SetTopicName("output-topic").   
 
// Call initTransactions() to ensure any outstanding transactions
//   for the given transactional.id are completed properly.
//   Gets the internal producer id and timestamp, used in all future 
//   transactional messages issued by the producer
producer:InitTransactions().
 
// Loop, receiving and processing records and generating new ones
repeat while true:
    consumerRecord = consumer:Poll(1000).
          
    if valid-object(consumerRecord) then do on error undo, leave:              
        // Begin the transaction to ensure that generation and acknowledgment
        //   happens in the same transaction.
        producer:BeginTransaction().
 
        messageBody = cast(consumerRecord:Body, JsonObject).
        // Modify the input message a bit, and then write out the new message
        messageBody:Add("processed", true).
        recordBuilder:SetBody(messageBody).
 
        producerRecord = recordBuilder:Build().
        producer:Send(producerRecord).
 
        // Add the offset information to the transaction for the message just consumed
        producer:SendOffsetToTransaction
          (new TopicPartitionOffset(consumerRecord:TopicName, consumerRecord:PartitionId, 
           consumerRecord:Offset + 1), consumer).
  
        // Commit transaction containing both the newly produced message,
        //   and the offset information for the consumed message.
        producer:CommitTransaction().
 
        catch e as Progress.Lang.Error:
            // Abort the transaction and get out if any error occurs.
            producer:AbortTransaction().
            undo, throw e.
        end.
    end.
end.   
         
catch e as Progress.Lang.Error :
    message e:GetMessage(1) view-as alert-box.
end catch.