It is possible for a topic to have its own configuration apart from that of the producer. For example, the same producer might send string records to one topic, and JSON records to a different topic. In such a case, a separate serializer may be needed for each message, as the content format of the messages could be different. Additionally, you may want to configure other producer properties on a per topic basis, such as acknowledgments, timeouts, and compression settings. All this is possible using a TopicConfigurationBuilder.

To configure a topic, you first create a new TopicConfigurationBuilder(). You must assign a topic name for the configuration. You can set additional properties and a serializer, if desired. You then build the topic configuration using the Build() method, and then call AddTopicConfiguration() to add it to the producer builder. This action stores the topic configuration with the producer.

Producer properties that can be set at the topic level (using SetTopicOption()) are:
  • request.required.acks
  • acks
  • request.timeout.ms
  • message.timeout.ms
  • delivery.timeout.ms
  • partitioner
  • compression.codec
  • compression.type
  • compression.level

Examples

In the following example, two different topic configurations are created. Each topic has a separate serializer appropriate for the type of object being sent.

using Progress.Lang.*.
using OpenEdge.Messaging.*.
using OpenEdge.Messaging.Kafka.*.
using OpenEdge.Core.Util.ConfigBuilder from propath.
using Progress.Json.ObjectModel.JsonObject from propath.

block-level on error undo, throw.

var RecordBuilder recordBuilder.
var KafkaProducerBuilder pb.
var IProducer producer.
var IProducerRecord record.
var TopicConfigurationBuilder tcb.
var longchar stringBody = "this is a string message".
var JsonObject jsonBody.
    
/* register producer builder */
pb = cast(ProducerBuilder:Create("progress-kafka"), KafkaProducerBuilder).
pb:SetBootstrapServers("localhost:9092").
pb:SetBodySerializer(new StringSerializer()).
    
tcb = new TopicConfigurationBuilder().

/* configure json-topic with a json serializer */    
tcb:SetTopicName("json-topic").
tcb:SetBodySerializer(new JsonSerializer()).

pb:AddTopicConfiguration(tcb:Build()).
       
/* configure string-topic with a string serializer */
tcb:SetBodySerializer(new StringSerializer()).
tcb:SetTopicName("string-topic").
pb:AddTopicConfiguration(tcb:Build()).
    
producer = pb:Build().
    
/* Populate and send string-topic message */
recordBuilder = producer:RecordBuilder.
recordBuilder:SetTopicName("string-topic").
recordBuilder:SetBody(stringBody).
record = recordBuilder:Build().
producer:Send(record).
       
/* Populate and send json-topic message */
recordBuilder:SetTopicName("json-topic").
jsonBody = new JsonObject().
jsonBody:Add("type", "json string body").
recordBuilder:SetBody(jsonBody).
record = recordBuilder:Build().
producer:Send(record).
    
producer:Flush(100).
        
catch err as Progress.Lang.Error :
    message err:GetMessage(1) view-as alert-box.
end catch.

In the next example, two topic configurations are created, one for a customer and one for an order. Each topic has a separate serializer. In addition, the customer topic is configured to get acknowledgments when only the leader replica receives the record, while the order topic is configured to get acknowledgments when all in-sync replicas receive the record. Furthermore, the topics differ on the maximum time to wait for the response of the request.

using OpenEdge.Messaging.Kakfa.Acks.

// Retrieve instance of producer builder
var ProducerBuilder pb = ProducerBuilder:Create("progress-kafka").
 
// Fetch the topic configuration builder
var TopicConfigurationBuilder topicConfigBuilder = pb:TopicConfigurationBuilder.
 
var ITopicConfiguration customerTopicConfiguration.
var ITopicConfiguration orderTopicConfiguration.
 
var IProducer producer.
var RecordBuilder recordBuilder.
 
// Assign the topic name to identify the destination to which the configuration applies to
topicConfigBuilder:SetTopicName("customer").
 
// Assign the body serializer for the customer
topicConfigBuilder:SetBodySerializer(new CustomerSerializer()).

// Set customer topic to get acknowledgements when the leader replica receives record
topicConfigBuilder:SetTopicOption("request.required.acks", "1").

// Set customer topic on how long to wait for response of request
topicConfigBuilder:SetTopicOption("request.timeout.ms", "10000"). 
 
// Create a topic configuration for customer
customerTopicConfiguration = topicConfigBuilder:Build().
 
// Provide the customer topic configuration to the producer builder
pb:AddTopicConfiguration(customerTopicConfiguration).


// Assign a separate serializer for the order topic
topicConfigBuilder:SetBodySerializer(new OrderSerializer()).
topicConfigBuilder:SetTopicName("order").

// Set order topic to get acknowledgements when all in-sync replicas receive record
topicConfigBuilder:SetTopicOption("request.required.acks", "-1").

// Set order topic on how long to wait for response of request
topicConfigBuilder:SetTopicOption("request.timeout.ms", "10000").


orderTopicConfiguration = topicConfigBuilder:Build().
pb:AddTopicConfiguration(orderTopicConfiguration).
 
// When the configuration is complete, request the builder to create the producer
producer = pb:Build().
 
recordBuilder = producer:RecordBuilder.
 
// Create a customer object and send it
recordBuilder:SetTopicName("customer").
recordBuilder:SetBody(new CustomerObject()).
record = recordBuilder:Build().
producer:Send(record).
 
// Create an order object and send it
recordBuilder:SetTopicName("order").
recordBuilder:SetBody(new OrderObject()).
record = recordBuilder:Build().
producer:Send(record).
 
delete object producer.