This table lists the OpenEdge.Messaging classes and interfaces most relevant to the transactional producer.
Class or interface Description
OpenEdge.Messaging.IProducer interface An interface that provides a way to construct and send records.
OpenEdge.Messaging.ITransactionalProducer interface An interface that is an extension to the OpenEdge.Messaging.IProducer interface, for sending transactional records.
OpenEdge.Messaging.Kafka.KafkaProducer class A class that implements a Kafka-specific record producer.
OpenEdge.Messaging.Kafka.KafkaTransactionalProducer class A class that inherits from OpenEdge.Messaging.Kafka.KafkaProducer and implements a Kafka-specific transactional record producer.
OpenEdge.Messaging.TopicPartitionOffset class A class that is used to commit the offset information for a message that has been consumed to the transaction of the producer. Applies to a Consume → Produce delivery pattern.

See the OpenEdge.Messaging package in the OpenEdge ABL API Reference for more detail on the classes, interfaces, methods, and properties of the transactional producer.

OpenEdge.Messaging.ITransactionalProducer interface

The transactional producer interface, OpenEdge.Messaging.ITransactionalProducer, is an extension of the OpenEdge.Messaging.IProducer interface, and includes additional methods for managing transactions:
  • InitTransactions() — Ensures any outstanding transactions for the given transactional.id are completed properly. Gets the internal producer id and timestamp, used in future transactional messages sent by the producer.
  • BeginTransaction() — Begin the transaction to ensure that message generation and acknowledgment happens in the same transaction.
  • SendOffsetToTransaction() — Record the offset for a consumer within the transaction used to produce messages. Applies to a Consume → Produce message delivery pattern.
  • SendOffsetsToTransaction() — Records a list of offsets for a consumer within the transaction used to produce messages. Applies to a Consume → Produce message delivery pattern.
  • CommitTransaction() — Commit the transaction.
  • AbortTransaction() — Abort the transaction.

SendOffsetToTransaction() and SendOffsetsToTransaction()

The SendOffsetToTransaction() and SendOffsetsToTransaction() methods are used in a Consume → Produce delivery pattern. In this scenario, the producer wants to ensure that offset acknowledgments for consumed records are stored with the broker, as part of the current transaction, if the transaction is successfully committed. The methods are called passing in partition metadata (topic name, partition ID) and the offset(s) related to any consumed messages, before the transaction is completed.

SendOffsetToTransaction() sends a single offset, while SendOffsetsToTransaction() sends a list of offsets. The recording of consumer offsets via SendOffsetToTransaction() or SendOffsetsToTransaction() may only be done once per transaction.

Syntax
SendOffsetToTransaction (topic-partition-offset, consumer)
SendOffsetsToTransaction (topic-partition-offsets, consumer)
topic-partition-offset
Information of type OpenEdge.Messaging.TopicPartitionOffset, consisting of:
  • TopicName — A CHARACTER value containing the topic name of the consumed message.
  • PartitionId — A non-negative INTEGER value containing the partition ID of the consumed message.
  • Offset — A non-negative INTEGER value containing the offset of the consumed message.
    Note: The value of the committed offset should be the next message the application consumes (for example, lastProcessedMessageOffset + 1).
topic-partition-offsets
A list of topic, partition, and offset information. See the topic-partition-offset description above for more detail on TopicPartitionOffset. For information on lists, see Progress.Collections.List<T> class in the ABL Reference.
consumer
Consumer information of type OpenEdge.Messaging.IConsumer. The consumer must be configured with isolation.level=read_committed and enable.auto.commit=false, and the application must not call CommitOffset on the consumer.

Example

producer:SendOffsetToTransaction
    (new TopicPartitionOffset(consumerRecord:TopicName, consumerRecord:PartitionId, 
     consumerRecord:Offset + 1), consumer).