Construct a client producer using strongly-typed methods
- Last Updated: July 18, 2024
- 5 minute read
- OpenEdge
- Version 12.8
- Documentation
- Use the
OpenEdge.MessagingAPI. - Create a producer builder.
- Configure producer builder properties using strongly-typed methods.
- Build the producer.
- Create a record builder.
- Set the topic name and message body.
- Build the record.
- Send the record.
- Wait for the response.
- Handle any errors.
- Clean up.
These steps are similar to the steps in Construct a generic client producer using property name-value pairs, except for the first three steps.
A breakdown of each step is provided next, followed by a complete code example.
Use the OpenEdge.Messaging API
|
The OpenEdge.Messaging API provides a common
set of messaging-focused interfaces. With this API, you can instantiate a message
producer, create messages, populate the message body, and send the message to a
destination.
OpenEdge.Messaging.Kafka.KafkaProducerBuilder is the producer builder
class for the Progress® OpenEdge® Apache® Kafka®
producer implementation. This class inherits from OpenEdge.Messaging.ProducerBuilder, and extends the default set of
setters with a number of Kafka specific properties.
Create a producer builder
|
To create a Kafka ProducerBuilder, call the Create() method with the "progress-kafka" argument, to get the Progress-provided implementation
of the Kafka client. Then cast to KafkaProducerBuilder.
Configure producer builder properties using strongly-typed methods
Once the producer builder is created, properties can be set using methods that
accept strongly-typed values. Method names correspond to Kafka producer builder
property names. For example, to set the bootstrap.servers property, you call SetBootstrapServers(). See the OpenEdge.Messaging.ProducerBuilder class in the
OpenEdge ABL API Reference for the properties you can set using
strongly-typed methods.
SetProducerOption() method,
where you can supply a name-value pair for any Kafka producer property. See Construct a generic client producer using property name-value pairs for more information.The following example code uses strongly-typed methods of the ProducerBuilder to set properties:
|
In this example, SetBootstrapServers() sets the Kafka bootstrap.servers property. This property contains a comma-delimited
list of servers the client will connect to. It is required and is how a Kafka client
knows how to locate a cluster.
Next, SetClientId() sets the
client identifier to "test client". This property is optional, but recommended. It
helps with logging and identifying problems, especially if the application creates
multiple producers, or if log files need to be coordinated across different
clients.
Next, SetBodySerializer() sets the
serialization method for the body of the message to StringSerializer. The serializer takes care of converting the data
into bytes for transmission to a topic. OpenEdge also provides a MemptrSerializer and a JsonSerializer. You can also create your own custom serializers. For
more information on serialization, see Producer serialization.
SetKeySerializer() sets the serialization
of the Kafka partition key to StringSerializer.
The partition key is optional, but when set explicitly, is often a string. Kafka
uses a round-robin scheme if neither the partition key, nor a specific partition is
assigned.
Build the producer
|
Build() causes the producer builder to create
a producer. At this point, the producer is completely configured. However,
additional topic configuration is allowed after the producer is created.
Create a new record builder
|
The record builder is used for creating the actual message for a topic. Typically, the application uses the producer to request a record builder. This allows a producer to initialize the record builder with defaults. It also allows the producer to produce a record with extra features that are not part of the default implementation.
Set the topic name and message body
Once a record builder is retrieved, a topic and a message body can be
assigned. All other record builder settings are optional. The topic is set using the
SetTopicName() method and the body is set
using the SetBody() method.
|
Next, two additional properties are set. SetPartitionKey() sets the partition key. The partition key is usually
a string, but could also be another data type. Often it may simply be a number, or a
string that has some association with the record. The AddHeader() method allows you to specify header information to send
along with the record.
|
If the client needs to send many messages to the same topic, the same record builder can be used.
Build the record
|
Once the record builder is set, call the Build() method to generate the record..
A record is usually specific to a particular producer, but it could also be used by another producer, depending on the needs of the application.
Send the record
|
Call Send() to send the record to the
producer. Assign the return value to a response object. The producer is responsible
for the message from this point on.
See Behavior of send response properties for more information on the properties and behavior of the response.
Flush() flushes the queue to
ensure the message is sent in a timely manner. The value supplied to Flush() is the number of milliseconds to wait for all
outstanding messages to be sent. Flush() returns TRUE if all
messages in the producer queue are delivered and returns FALSE if it does not finish
due to the specified timeout.
Wait for the response
Use a loop to wait until the delivery response is received and the response object properties have been populated.
|
Handle any errors
Check the Success property on the
response object for any failures and handle them appropriately.
|
See Exception handling for messaging errors for more information on handling errors.
Clean up
|
You can delete the producer when finished with it, as shown in the example above, but this is not required. If not explicitly deleted, the object will be garbage collected.
Complete example
The following code contains the complete example:
|