This topic shows the steps and code necessary for sending a message using strongly-typed methods. The main steps are:
  1. Use the OpenEdge.Messaging API.
  2. Create a producer builder.
  3. Configure producer builder properties using strongly-typed methods.
  4. Build the producer.
  5. Create a record builder.
  6. Set the topic name and message body.
  7. Build the record.
  8. Send the record.
  9. Wait for the response.
  10. Handle any errors.
  11. Clean up.

These steps are similar to the steps in Construct a generic client producer using property name-value pairs, except for the first three steps.

A breakdown of each step is provided next, followed by a complete code example.

Use the OpenEdge.Messaging API

using OpenEdge.Messaging.*.
using OpenEdge.Messaging.Kafka.*.

The OpenEdge.Messaging API provides a common set of messaging-focused interfaces. With this API, you can instantiate a message producer, create messages, populate the message body, and send the message to a destination.

OpenEdge.Messaging.Kafka.KafkaProducerBuilder is the producer builder class for the Progress® OpenEdge® Apache® Kafka® producer implementation. This class inherits from OpenEdge.Messaging.ProducerBuilder, and extends the default set of setters with a number of Kafka specific properties.

Create a producer builder

var KafkaProducerBuilder pb.
...
pb = cast(ProducerBuilder:Create("progress-kafka"), KafkaProducerBuilder).

To create a Kafka ProducerBuilder, call the Create() method with the "progress-kafka" argument, to get the Progress-provided implementation of the Kafka client. Then cast to KafkaProducerBuilder.

Configure producer builder properties using strongly-typed methods

Once the producer builder is created, properties can be set using methods that accept strongly-typed values. Method names correspond to Kafka producer builder property names. For example, to set the bootstrap.servers property, you call SetBootstrapServers(). See the OpenEdge.Messaging.ProducerBuilder class in the OpenEdge ABL API Reference for the properties you can set using strongly-typed methods.

Note: Not all Kafka properties have strongly-typed methods in OpenEdge. If a method is not available for a property you need, use the generic SetProducerOption() method, where you can supply a name-value pair for any Kafka producer property. See Construct a generic client producer using property name-value pairs for more information.

The following example code uses strongly-typed methods of the ProducerBuilder to set properties:

pb:SetBootstrapServers("localhost:9092,otherhost:9092").
pb:SetClientId("test client").
pb:SetBodySerializer(new StringSerializer()).
pb:SetKeySerializer(new StringSerializer()).

In this example, SetBootstrapServers() sets the Kafka bootstrap.servers property. This property contains a comma-delimited list of servers the client will connect to. It is required and is how a Kafka client knows how to locate a cluster.

Next, SetClientId() sets the client identifier to "test client". This property is optional, but recommended. It helps with logging and identifying problems, especially if the application creates multiple producers, or if log files need to be coordinated across different clients.

Next, SetBodySerializer() sets the serialization method for the body of the message to StringSerializer. The serializer takes care of converting the data into bytes for transmission to a topic. OpenEdge also provides a MemptrSerializer and a JsonSerializer. You can also create your own custom serializers. For more information on serialization, see Producer serialization.

SetKeySerializer() sets the serialization of the Kafka partition key to StringSerializer. The partition key is optional, but when set explicitly, is often a string. Kafka uses a round-robin scheme if neither the partition key, nor a specific partition is assigned.

Build the producer

var IProducer producer.
...
producer = pb:Build().

Build() causes the producer builder to create a producer. At this point, the producer is completely configured. However, additional topic configuration is allowed after the producer is created.

Create a new record builder

var RecordBuilder recordBuilder.
...
recordBuilder = producer:RecordBuilder.

The record builder is used for creating the actual message for a topic. Typically, the application uses the producer to request a record builder. This allows a producer to initialize the record builder with defaults. It also allows the producer to produce a record with extra features that are not part of the default implementation.

Set the topic name and message body

Once a record builder is retrieved, a topic and a message body can be assigned. All other record builder settings are optional. The topic is set using the SetTopicName() method and the body is set using the SetBody() method.

recordBuilder:SetTopicName("mystream"). // topic is always required
    
recordBuilder:SetBody("~{ 'state' : 'running' ~}"). // body of the record is always required

Next, two additional properties are set. SetPartitionKey() sets the partition key. The partition key is usually a string, but could also be another data type. Often it may simply be a number, or a string that has some association with the record. The AddHeader() method allows you to specify header information to send along with the record.

recordBuilder:SetPartitionKey("appstatus").

recordBuilder:AddHeader("header", "value").

If the client needs to send many messages to the same topic, the same record builder can be used.

Build the record

var IProducerRecord record.
...
record = recordBuilder:Build().

Once the record builder is set, call the Build() method to generate the record..

A record is usually specific to a particular producer, but it could also be used by another producer, depending on the needs of the application.

Send the record

var ISendResponse response.
...
response = producer:Send(record).

producer:Flush(1000).

Call Send() to send the record to the producer. Assign the return value to a response object. The producer is responsible for the message from this point on.

See Behavior of send response properties for more information on the properties and behavior of the response.

Flush() flushes the queue to ensure the message is sent in a timely manner. The value supplied to Flush() is the number of milliseconds to wait for all outstanding messages to be sent. Flush() returns TRUE if all messages in the producer queue are delivered and returns FALSE if it does not finish due to the specified timeout.

Wait for the response

Use a loop to wait until the delivery response is received and the response object properties have been populated.

repeat while not response:Completed:
    pause .1.
end.

Handle any errors

Check the Success property on the response object for any failures and handle them appropriately.

if response:Success then do:
   message "Send successful" view-as alert-box.
end.
else do:
    undo, throw new Progress.Lang.AppError("Failed to send the record: " +
        response:ErrorMessage, 0).
end.

catch err as Progress.Lang.Error :
    message err:GetMessage(1) view-as alert-box.
end catch.

See Exception handling for messaging errors for more information on handling errors.

Clean up

finally:
    delete object producer.
end.

You can delete the producer when finished with it, as shown in the example above, but this is not required. If not explicitly deleted, the object will be garbage collected.

Complete example

The following code contains the complete example:

using OpenEdge.Messaging.*.
using OpenEdge.Messaging.Kafka.*.

block-level on error undo, throw.

var RecordBuilder recordBuilder.
var KafkaProducerBuilder pb.
var IProducer producer.
var IProducerRecord record.
var ISendResponse response.

pb = cast(ProducerBuilder:Create("progress-kafka"), KafkaProducerBuilder).

pb:SetBootstrapServers("localhost:9092,otherhost:9092").
pb:SetClientId("test client").
pb:SetBodySerializer(new StringSerializer()).
pb:SetKeySerializer(new StringSerializer()).

/* Optionally set message.timeout.ms. In case of a mistake/typo in bootstrap
  servers, this will make it time out sooner. Otherwise, it can sit for a long time 
  waiting for the Send to complete and show an error. */
pb:SetProducerOption("message.timeout.ms", "10000").

producer = pb:Build().

recordBuilder = producer:RecordBuilder.
recordBuilder:SetTopicName("mystream").   
recordBuilder:SetBody("~{ 'state' : 'running' ~}").

record = recordBuilder:Build().

response = producer:Send(record).

producer:Flush(1000).
    
repeat while not response:Completed:
    pause .1 no-message.
end.

if response:Success then do:
   message "Send successful" view-as alert-box.
end.
else do:
    undo, throw new Progress.Lang.AppError("Failed to send the record: " +
        response:ErrorMessage, 0).
end.
 

catch err as Progress.Lang.Error :
    message err:GetMessage(1) view-as alert-box.
end catch.

finally:
    delete object producer no-error.
end.