The following code example shows how to construct and send multiple records using a transactional producer. By using a transaction, the client ensures that all the records are sent successfully, or none of the records are sent.

The producer is configured with a transactional identifier (through a call to SetTransactionalId()), which enables a transactional producer. After the transactional producer is created, the transactional state is initialized with a call to InitTransactions(). Transaction scope is managed through calls to BeginTransaction(), CommitTransaction(), and AbortTransaction(). The same Send() method is used for sending records, as with a non-transactional producer.

using OpenEdge.Messaging.*.
using Progress.Json.ObjectModel.*.
using OpenEdge.Messaging.Kafka.KafkaProducerBuilder from propath.
 
block-level on error undo, throw.
 
var RecordBuilder recordBuilder.
var KafkaProducerBuilder kpb.
var ITransactionalProducer producer.
var JsonObject msgbody.
var IProducerRecord record.
     
kpb = cast(ProducerBuilder:Create("progress-kafka"), KafkaProducerBuilder).
kpb:SetBootstrapServers("localhost:9092").
kpb:SetBodySerializer(new JsonSerializer()).

// To indicate the need for a transactional producer, 
//   set the TransactionalId producer option.
kpb:SetTransactionalId("some.unique.value").
 
// In order to use the transactional producer methods, 
//   cast the newly built producer to ITransactionalProducer.
producer = cast(kpb:Build(), ITransactionalProducer).
 
// Call InitTransactions() to ensure any outstanding transactions for the given
//   transactional id are completed properly.
producer:InitTransactions().
 
recordBuilder = producer:RecordBuilder.
 
recordBuilder:SetTopicName("my-topic").
     
msgBody = new JsonObject().
msgBody:Add("name", "Lift Line Skiing").
msgBody:Add("address", "1 Main Street").
     
recordBuilder:SetBody(msgBody).
record = recordBuilder:Build().
 
// Begin a transction before sending any records
producer:BeginTransaction().

producer:Send(record).
producer:Send(record).
 
// Commit any outstanding records.
producer:CommitTransaction().
     
        
catch e as Progress.Lang.Error :
    message e:GetMessage(1) view-as alert-box.
    producer:AbortTransaction() no-error.
end catch.