Kafka / RabbitMQ connectors

The Bonita Kafka / RabbitMQ connectors let you produce and consume messages from Apache Kafka and RabbitMQ brokers directly from your Bonita processes.

The Bonita Kafka / RabbitMQ Connectors are available for Bonita 10.2 Community (2024.3) version and above.

This connector is currently in Beta. It has not yet been fully validated in production environments.

We welcome your feedback — please report testing results or issues using the beta feedback form on GitHub.

We are eager to collaborate with early adopters to bring this connector to General Availability.

Overview

The Kafka / RabbitMQ connector provides six operations:

  • Kafka Produce — produce a message to a Kafka topic

  • Kafka Consume — consume messages from a Kafka topic

  • RabbitMQ Publish — publish a message to a RabbitMQ exchange

  • RabbitMQ Consume — consume messages from a RabbitMQ queue

  • Event Publish JSON — publish a JSON payload to either broker type

  • Get Consumer Status — check consumer group lag and connectivity

Getting started

Add the connector as an extension dependency to your Bonita project. Import the .jar file via Import from file in Bonita Studio.

Kafka connection configuration

Parameter Required Description Default

bootstrapServers

Yes

Kafka bootstrap servers (host:port)

 — 

securityProtocol

No

Security protocol (PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL)

PLAINTEXT

saslMechanism

No

SASL mechanism (e.g., PLAIN, SCRAM-SHA-256)

 — 

saslUsername

No

SASL username

 — 

saslPassword

No

SASL password

 — 

RabbitMQ connection configuration

Parameter Required Description Default

bootstrapServers

Yes

RabbitMQ server address (host:port)

 — 

rabbitmqUsername

No

RabbitMQ username

 — 

rabbitmqPassword

No

RabbitMQ password

 — 

rabbitmqVirtualHost

No

RabbitMQ virtual host

/

rabbitmqSsl

No

Whether to use SSL

false

Kafka Produce (kafka-produce-message)

Produce a message to a Kafka topic.

Input parameters

Parameter Required Description Default

topic

Yes

Kafka topic name

 — 

messageKey

No

Message key for partitioning

 — 

messageValue

Yes

Message payload

 — 

timeoutSeconds

No

Send timeout in seconds

30

maxRetries

No

Maximum retry attempts

3

Output parameters

Parameter Type Description

destination

String

Topic the message was sent to

partition

Integer

Partition the message was written to

offset

Long

Offset of the produced message

timestamp

Long

Timestamp of the produced message

success

Boolean

Whether the operation succeeded

errorMessage

String

Error message if the operation failed

Kafka Consume (kafka-consume-message)

Consume messages from a Kafka topic.

Input parameters

Parameter Required Description Default

topic

Yes

Kafka topic to consume from

 — 

consumerGroup

Yes

Consumer group ID

 — 

pollDurationMs

No

Poll duration in milliseconds

5000

maxRecords

No

Maximum records to consume

10

autoOffsetReset

No

Offset reset behavior: earliest or latest

earliest

timeoutSeconds

No

Overall timeout in seconds

30

maxRetries

No

Maximum retry attempts

3

Output parameters

Parameter Type Description

messageCount

Integer

Number of messages consumed

messagesJson

String

JSON array of consumed messages

source

String

Topic consumed from

consumerGroupOut

String

Consumer group used

success

Boolean

Whether the operation succeeded

errorMessage

String

Error message if the operation failed

RabbitMQ Publish (rabbitmq-publish-message)

Publish a message to a RabbitMQ exchange.

Input parameters

Parameter Required Description Default

exchange

No

Exchange name (empty for default exchange)

routingKey

Yes

Routing key

 — 

messageBody

Yes

Message body content

 — 

contentType

No

Message content type

application/json

timeoutSeconds

No

Send timeout in seconds

30

maxRetries

No

Maximum retry attempts

3

Output parameters

Parameter Type Description

destination

String

Routing destination

partition

Integer

Partition info

offset

Long

Offset info

timestamp

Long

Timestamp of the published message

success

Boolean

Whether the operation succeeded

errorMessage

String

Error message if the operation failed

RabbitMQ Consume (rabbitmq-consume-message)

Consume messages from a RabbitMQ queue.

Input parameters

Parameter Required Description Default

queue

Yes

Queue name to consume from

 — 

autoAck

No

Whether to auto-acknowledge messages

true

timeoutSeconds

No

Consume timeout in seconds

30

maxRetries

No

Maximum retry attempts

3

Output parameters

Parameter Type Description

messageCount

Integer

Number of messages consumed

messagesJson

String

JSON array of consumed messages

source

String

Queue consumed from

success

Boolean

Whether the operation succeeded

errorMessage

String

Error message if the operation failed

Event Publish JSON (event-publish-json)

Publish a JSON payload to either a Kafka topic or RabbitMQ exchange, abstracting the broker type.

Input parameters

Parameter Required Description Default

brokerType

Yes

Broker type: kafka or rabbitmq

 — 

destination

Yes

Topic or routing key

 — 

jsonPayload

Yes

JSON message payload

 — 

timeoutSeconds

No

Send timeout in seconds

30

maxRetries

No

Maximum retry attempts

3

Output parameters

Parameter Type Description

brokerTypeOut

String

Broker type used

destinationOut

String

Destination used

sent

Boolean

Whether the message was sent

timestamp

Long

Timestamp

success

Boolean

Whether the operation succeeded

errorMessage

String

Error message if the operation failed

Get Consumer Status (event-get-consumer-status)

Check consumer group lag and connectivity status.

Input parameters

Parameter Required Description Default

brokerType

Yes

Broker type: kafka or rabbitmq

 — 

topicOrQueue

Yes

Topic or queue name to check

 — 

consumerGroup

No

Consumer group to check (Kafka only)

 — 

timeoutSeconds

No

Timeout in seconds

30

maxRetries

No

Maximum retry attempts

3

Output parameters

Parameter Type Description

brokerTypeOut

String

Broker type checked

source

String

Topic or queue checked

consumerGroupOut

String

Consumer group checked

lagOrDepth

Long

Consumer lag (Kafka) or queue depth (RabbitMQ)

partitionsOrConsumers

Integer

Number of partitions or consumers

connected

Boolean

Whether the broker is reachable

success

Boolean

Whether the operation succeeded

errorMessage

String

Error message if the operation failed

Error handling

All operations set success=false and populate errorMessage on failure. Error messages are truncated to 1000 characters to prevent database column overflow in Bonita.

Common errors include broker connectivity failures, authentication errors, topic/queue not found, and serialization issues.

Source code

The connector source code is available on GitHub: bonita-connector-kafka