This topic describes the steps and code necessary for sending a message. Producer configuration is managed through strings using Apache® Kafka® property name-value pairs. The main steps are:
  1. Use the OpenEdge.Messaging API.
  2. Create a producer builder.
  3. Configure producer builder properties using the SetProducerOption() method.
  4. Build the producer.
  5. Create a record builder.
  6. Set the topic name and message body.
  7. Build the record.
  8. Send the record.
  9. Wait for the response.
  10. Handle any errors.
  11. Clean up.

These steps are similar to the steps in Construct a client producer using strongly-typed methods, except for the first three steps.

A breakdown of each step is provided next, followed by a complete code example.

Use the OpenEdge.Messaging API

using OpenEdge.Messaging.*.

The OpenEdge.Messaging API provides a common set of messaging-focused interfaces. With this API, you can instantiate a message producer, create messages, populate the message body, and send the message to a destination.

Create a producer builder

var ProducerBuilder pb.
...
pb = ProducerBuilder:Create("progress-kafka").

To create a Kafka ProducerBuilder, call the Create() method with the "progress-kafka" argument, to get the Progress-provided implementation of the Kafka client.

Configure producer builder properties using the SetProducerOption() method

SetProducerOption() is a method in the OpenEdge.Messaging.ProducerBuilder class which sets a Kafka producer configuration property using a name-value pair. Here is the syntax:

pb:SetProducerOption("prop-name", "value").
prop-name
The name of a Kafka producer property.
value
The value of the property.
Note: A link to Kafka configuration properties can be found in the librdkafka documentation. Properties marked with "high importance" are the ones you are most likely to set.
Note: For information on how to configure authentication with client producers, see Configure SSL connections with Apache Kafka.

The following example uses SetProducerOption() to set two properties:

pb:SetProducerOption("bootstrap.servers", "localhost:9092").

pb:SetProducerOption("value.serializer", "OpenEdge.Messaging.StringSerializer").

The bootstrap.servers property takes a comma-delimited list of servers in the Kafka cluster. Bootstrap.servers is required.

The value.serializer property specifies the serializer to be used to convert the body of a message into bytes for transmission to the destination topic. In the example above, "OpenEdge.Messaging.StringSerializer" is specified. For more information on serialization, see Producer serialization.

Build the producer

var IProducer producer.
...
producer = pb:Build().

Build() causes the producer builder to create a producer. At this point, the producer is completely configured.

Create a new record builder

var RecordBuilder recordBuilder.
...
recordBuilder = producer:RecordBuilder.

You use the producer to request a record builder. This allows a producer to initialize the record builder with defaults and allows the producer to produce a record with extra features that are not part of the default implementation.

Set the topic name and message body

Once a record builder is created, you assign a topic and a message body. Both are required. Use SetTopicName() to set the topic name and SetBody() to set the message body.

recordBuilder:SetTopicName("mystream").
  
recordBuilder:SetBody("~{ 'state' : 'running' ~}").

The same record builder is normally used to send many messages to the same topic.

Build the record

var IProducerRecord record.
...
record = recordBuilder:Build().

Once the record builder is set, call the Build() method to generate the record..

Records are usually specific to a particular producer, but could also be used by other producers, depending on the needs of the application.

Send the record to the producer

var ISendResponse response.
...
response = producer:Send(record).

producer:Flush(1000).

The next step is to send the record to the producer. Call Send(), with the built record as an argument, on the producer. The producer is responsible for the message from this point on. To capture the response, use an ISendResponse object to hold the response.

See Behavior of send response properties, for more information on the properties and behavior of the response object.

You can call Flush() to flush the queue to ensure the message is sent in a timely manner. The value supplied to Flush() is the number of milliseconds to wait for all outstanding messages to be sent. Flush() returns TRUE if all messages in the producer queue are delivered and returns FALSE if it does not finish due to the specified timeout.

Wait for the response

repeat while not response:Completed:
    pause .1 no-message.
end.

Use a loop to wait until the delivery response is received and the response object properties have been populated.

Handle any errors

if response:Success then do:
   message "Send successful" view-as alert-box.
end.
else do:
    undo, throw new Progress.Lang.AppError("Failed to send the record: " + 
        response:ErrorMessage, 0).
end.
    
catch err as Progress.Lang.Error :
    message 
        err:GetMessage(1) skip(2) 
        err:CallStack 
            view-as alert-box.
end catch.

Check the Success property on the response object for any failures and handle them appropriately. See Exception handling for messaging errors for more information on handling errors.

Clean up

finally:
    delete object producer no-error.
end.

You can delete the producer when finished with it, as shown in the example above, but this is not required. If not explicitly deleted, the object will be garbage collected.

Complete example

The following code contains the complete example:

using OpenEdge.Messaging.IProducer from propath.
using OpenEdge.Messaging.ProducerBuilder from propath.
using OpenEdge.Messaging.IProducerRecord from propath.
using OpenEdge.Messaging.RecordBuilder from propath.
using OpenEdge.Messaging.ISendResponse from propath.

block-level on error undo, throw.

var RecordBuilder recordBuilder.
var ProducerBuilder pb.
var IProducer producer.
var IProducerRecord record.
var ISendResponse response.

pb = ProducerBuilder:Create("progress-kafka").

pb:SetProducerOption("bootstrap.servers", "localhost:9092").
pb:SetProducerOption("value.serializer", "OpenEdge.Messaging.StringSerializer").

/* Optionally set message.timeout.ms. In case of a mistake/typo in bootstrap
  servers, this will make it time out sooner. Otherwise, it can sit for a long time 
  waiting for the Send to complete and show an error. */
pb:SetProducerOption("message.timeout.ms", "10000").

producer = pb:Build().

recordBuilder = producer:RecordBuilder.
recordBuilder:SetTopicName("mystream").
recordBuilder:SetBody("~{ 'state' : 'running' ~}").

record = recordBuilder:Build().

response = producer:Send(record).
    
producer:Flush(1000).
    
repeat while not response:Completed:
    pause .1 no-message.
end.

if response:Success then do:
   message "Send successful" view-as alert-box.
end.
else do:
    undo, throw new Progress.Lang.AppError("Failed to send the record: " +
        response:ErrorMessage, 0).
end.
    
catch err as Progress.Lang.Error :
    message 
        err:GetMessage(1) skip(2) 
        err:CallStack 
            view-as alert-box.
end catch.

finally:
    delete object producer no-error.
end.