Kafka / RabbitMQ connectors
The Bonita Kafka / RabbitMQ connectors let you produce and consume messages from Apache Kafka and RabbitMQ brokers directly from your Bonita processes.
The Bonita Kafka / RabbitMQ Connectors are available for Bonita 10.2 Community (2024.3) version and above.
|
This connector is currently in Beta. It has not yet been fully validated in production environments. We welcome your feedback — please report testing results or issues using the beta feedback form on GitHub. We are eager to collaborate with early adopters to bring this connector to General Availability. |
Overview
The Kafka / RabbitMQ connector provides six operations:
-
Kafka Produce — produce a message to a Kafka topic
-
Kafka Consume — consume messages from a Kafka topic
-
RabbitMQ Publish — publish a message to a RabbitMQ exchange
-
RabbitMQ Consume — consume messages from a RabbitMQ queue
-
Event Publish JSON — publish a JSON payload to either broker type
-
Get Consumer Status — check consumer group lag and connectivity
Getting started
Add the connector as an extension dependency to your Bonita project. Import the .jar file via Import from file in Bonita Studio.
Kafka connection configuration
| Parameter | Required | Description | Default |
|---|---|---|---|
bootstrapServers |
Yes |
Kafka bootstrap servers (host:port) |
— |
securityProtocol |
No |
Security protocol (PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL) |
PLAINTEXT |
saslMechanism |
No |
SASL mechanism (e.g., PLAIN, SCRAM-SHA-256) |
— |
saslUsername |
No |
SASL username |
— |
saslPassword |
No |
SASL password |
— |
Kafka Produce (kafka-produce-message)
Produce a message to a Kafka topic.
Input parameters
| Parameter | Required | Description | Default |
|---|---|---|---|
topic |
Yes |
Kafka topic name |
— |
messageKey |
No |
Message key for partitioning |
— |
messageValue |
Yes |
Message payload |
— |
timeoutSeconds |
No |
Send timeout in seconds |
30 |
maxRetries |
No |
Maximum retry attempts |
3 |
Output parameters
| Parameter | Type | Description |
|---|---|---|
destination |
String |
Topic the message was sent to |
partition |
Integer |
Partition the message was written to |
offset |
Long |
Offset of the produced message |
timestamp |
Long |
Timestamp of the produced message |
success |
Boolean |
Whether the operation succeeded |
errorMessage |
String |
Error message if the operation failed |
Kafka Consume (kafka-consume-message)
Consume messages from a Kafka topic.
Input parameters
| Parameter | Required | Description | Default |
|---|---|---|---|
topic |
Yes |
Kafka topic to consume from |
— |
consumerGroup |
Yes |
Consumer group ID |
— |
pollDurationMs |
No |
Poll duration in milliseconds |
5000 |
maxRecords |
No |
Maximum records to consume |
10 |
autoOffsetReset |
No |
Offset reset behavior: earliest or latest |
earliest |
timeoutSeconds |
No |
Overall timeout in seconds |
30 |
maxRetries |
No |
Maximum retry attempts |
3 |
Output parameters
| Parameter | Type | Description |
|---|---|---|
messageCount |
Integer |
Number of messages consumed |
messagesJson |
String |
JSON array of consumed messages |
source |
String |
Topic consumed from |
consumerGroupOut |
String |
Consumer group used |
success |
Boolean |
Whether the operation succeeded |
errorMessage |
String |
Error message if the operation failed |
RabbitMQ Publish (rabbitmq-publish-message)
Publish a message to a RabbitMQ exchange.
Input parameters
| Parameter | Required | Description | Default |
|---|---|---|---|
exchange |
No |
Exchange name (empty for default exchange) |
|
routingKey |
Yes |
Routing key |
— |
messageBody |
Yes |
Message body content |
— |
contentType |
No |
Message content type |
application/json |
timeoutSeconds |
No |
Send timeout in seconds |
30 |
maxRetries |
No |
Maximum retry attempts |
3 |
RabbitMQ Consume (rabbitmq-consume-message)
Event Publish JSON (event-publish-json)
Publish a JSON payload to either a Kafka topic or RabbitMQ exchange, abstracting the broker type.
Get Consumer Status (event-get-consumer-status)
Check consumer group lag and connectivity status.
Input parameters
| Parameter | Required | Description | Default |
|---|---|---|---|
brokerType |
Yes |
Broker type: kafka or rabbitmq |
— |
topicOrQueue |
Yes |
Topic or queue name to check |
— |
consumerGroup |
No |
Consumer group to check (Kafka only) |
— |
timeoutSeconds |
No |
Timeout in seconds |
30 |
maxRetries |
No |
Maximum retry attempts |
3 |
Output parameters
| Parameter | Type | Description |
|---|---|---|
brokerTypeOut |
String |
Broker type checked |
source |
String |
Topic or queue checked |
consumerGroupOut |
String |
Consumer group checked |
lagOrDepth |
Long |
Consumer lag (Kafka) or queue depth (RabbitMQ) |
partitionsOrConsumers |
Integer |
Number of partitions or consumers |
connected |
Boolean |
Whether the broker is reachable |
success |
Boolean |
Whether the operation succeeded |
errorMessage |
String |
Error message if the operation failed |
Error handling
All operations set success=false and populate errorMessage on failure. Error messages are truncated to 1000 characters to prevent database column overflow in Bonita.
Common errors include broker connectivity failures, authentication errors, topic/queue not found, and serialization issues.
Source code
The connector source code is available on GitHub: bonita-connector-kafka