SSL/TLS Configuration
To connect to RabbitMQ over SSL/TLS, enable the useSsl flag and optionally specify the protocol:
rabbitmq {
connectionFactories {
factory(
name: 'secure',
hostname: 'rabbitmq.example.com',
port: 5671,
username: 'myuser',
password: 'mypassword',
useSsl: true,
sslProtocol: 'TLSv1.2'
)
}
}
The sslProtocol defaults to TLSv1.2 when useSsl is enabled. The SSL context is configured via the ConnectionFactoryHelper which wraps the underlying RabbitMQ ConnectionFactory.
Multiple Connection Factories
The plugin supports connecting to multiple RabbitMQ brokers simultaneously. Each factory is identified by its name alias. The first configured factory becomes the default connection (aliased as rabbitConnectionFactory in Spring).
rabbitmq {
connectionFactories {
factory(
name: 'primary',
hostname: 'rabbitmq-primary.example.com',
username: 'user1',
password: 'pass1'
)
factory(
name: 'analytics',
hostname: 'rabbitmq-analytics.example.com',
username: 'user2',
password: 'pass2'
)
}
}
Using a Specific Connection in Consumers
Reference the connection alias in @Queue or @Subscriber annotations:
import com.bertram.rabbitmq.conf.RabbitConsumer
import com.bertram.rabbitmq.conf.Queue
@RabbitConsumer
class AnalyticsService {
@Queue(name = 'analytics.events', conAlias = 'analytics', durable = true)
def processAnalyticsEvent(Map event) {
// This consumer reads from the 'analytics' RabbitMQ broker
println "Analytics event: ${event}"
}
}
Sending to a Specific Connection
Specify the connection alias when sending messages:
class EventPublisher {
def rabbitMQService
def publishToAnalytics(Map event) {
rabbitMQService.convertAndSend('analytics', 'events.track', event)
}
def publishToPrimary(Map data) {
rabbitMQService.convertAndSend('primary', 'tasks.process', data)
}
}
Cluster and High Availability
Cluster Connections
For RabbitMQ clusters, use the addresses property to specify multiple nodes:
rabbitmq {
connectionFactories {
factory(
name: 'cluster',
username: 'guest',
password: 'guest',
addresses: 'rabbit1:5672,rabbit2:5672,rabbit3:5672'
)
}
}
When addresses is specified, the hostname and port properties are ignored. The Spring AMQP connection factory will attempt each address in order.
Exchange Patterns
Topic Exchange
Topic exchanges route messages based on wildcard pattern matching on the routing key. Use * to match a single word and # to match zero or more words.
@RabbitConsumer
class LogService {
@Subscriber(
exchangeParams = @ExchangeParams(
name = 'logs',
exchangeType = com.bertram.rabbitmq.conf.ExchangeType.topic,
durable = true
),
routingKey = 'log.error.#'
)
def handleErrors(Map logEntry) {
// Receives all error logs: log.error, log.error.database, etc.
}
@Subscriber(
exchangeParams = @ExchangeParams(name = 'logs'),
routingKey = 'log.*.database',
multiSubscriber = true
)
def handleDatabaseLogs(Map logEntry) {
// Receives all database logs: log.error.database, log.info.database, etc.
}
}
Direct Exchange
Direct exchanges route messages to queues whose binding key exactly matches the routing key:
@RabbitConsumer
class TaskService {
@Subscriber(
exchangeParams = @ExchangeParams(
name = 'tasks',
exchangeType = com.bertram.rabbitmq.conf.ExchangeType.direct,
durable = true
),
routingKey = 'task.email'
)
def handleEmailTask(Map task) {
// Only receives messages with routing key 'task.email'
}
}
Fanout Exchange
Fanout exchanges broadcast every message to all bound queues, ignoring routing keys:
@RabbitConsumer
class BroadcastService {
@Subscriber(
exchangeParams = @ExchangeParams(
name = 'broadcasts',
exchangeType = com.bertram.rabbitmq.conf.ExchangeType.fanout,
durable = true
),
queueParams = @QueueParams(
autoNodeName = true,
durable = false,
autoDelete = true,
exclusive = false
)
)
def handleBroadcast(Map message) {
// Every instance of this application receives every broadcast
println "Broadcast received: ${message}"
}
}
When using autoNodeName = true in @QueueParams, the plugin generates a queue name using the hostname (<exchangeName>-<hostname>), ensuring each application node gets its own queue. This is particularly useful for fanout exchanges in HA deployments where each node should receive its own copy of every message.
If the hostname resolves to localhost, this can cause problems in HA environments since all nodes would share the same queue name.
|
Headers Exchange
Headers exchanges route based on message header attributes rather than routing keys:
@RabbitConsumer
class HeaderRoutingService {
@Subscriber(
exchangeParams = @ExchangeParams(
name = 'header-exchange',
exchangeType = com.bertram.rabbitmq.conf.ExchangeType.headers,
durable = true
)
)
def handleHeaderMatched(Map message) {
// Process messages matching header criteria
}
}
Error Handling and Retry
Built-in Retry Policy
The plugin configures a StatefulRetryOperationsInterceptorFactoryBean with configurable retry parameters. Failed message deliveries are automatically retried based on the configured policy:
rabbitmq:
retryPolicy:
maxAttempts: 5 # Maximum retry attempts (default: 3)
backOffPeriod: 10000 # Delay between retries in ms (default: 5000)
The retry policy uses a FixedBackOffPolicy, meaning each retry waits the same amount of time.
Custom Error Handling
The plugin registers a RabbitErrorHandler for each connection factory. Errors during message consumption are logged via this handler. Override it by defining your own bean:
// In resources.groovy or doWithSpring
beans = {
rabbitErrorHandler(MyCustomErrorHandler)
}
Handling Errors in Consumer Methods
You should handle exceptions within your consumer methods to prevent message loss:
@RabbitConsumer
class ResilientService {
@Queue(name = 'important.tasks', durable = true)
def handleTask(Map message) {
try {
processTask(message)
} catch (TemporaryException e) {
log.warn("Temporary failure processing task, will be retried: ${e.message}")
throw e // Re-throw to trigger retry
} catch (PermanentException e) {
log.error("Permanent failure processing task: ${e.message}", e)
sendToDeadLetterQueue(message)
// Don't re-throw - message is acknowledged and won't be retried
}
}
}
Persistence Context
The plugin automatically manages Hibernate persistence contexts for consumer methods. When a message is received:
-
A persistence context is initialized via
PersistenceContextInterceptor -
The consumer method is invoked
-
The persistence context is flushed and destroyed
This means you can safely use GORM operations within consumer methods without manually managing sessions:
@RabbitConsumer
class UserSyncService {
@Queue(name = 'user.sync', durable = true)
def syncUser(Map userData) {
def user = User.findByExternalId(userData.externalId)
if (user) {
user.name = userData.name
user.email = userData.email
user.save(flush: true)
} else {
new User(
externalId: userData.externalId,
name: userData.name,
email: userData.email
).save(flush: true)
}
}
}
Performance Tuning
Concurrency
Increase the number of concurrent consumers for high-throughput queues:
@Queue(name = 'high-volume.queue', consumers = 10, prefetchCount = 50, durable = true)
def handleHighVolume(Map message) {
// 10 concurrent consumers, each prefetching up to 50 messages
}
Guidelines:
-
Start with 1 consumer and increase based on observed throughput
-
Set
prefetchCountto balance throughput vs. memory usage -
For CPU-bound tasks, set consumers to the number of available cores
-
For I/O-bound tasks, consumers can exceed the number of cores
Channel Cache Size
The channelCacheSize controls how many channels are cached per connection. Increase this for applications with many concurrent publishers:
factory(
name: 'main',
hostname: 'localhost',
username: 'guest',
password: 'guest',
channelCacheSize: 25 // Default is 10
)
Heartbeat Configuration
The heartBeatDelay controls the heartbeat interval between the client and RabbitMQ broker. A lower value detects connection failures faster but increases network traffic:
factory(
name: 'main',
hostname: 'localhost',
username: 'guest',
password: 'guest',
heartBeatDelay: 60 // 60 seconds (default is 580)
)
Shutdown Timeout
Control how long listener containers wait for in-flight messages to complete during shutdown:
rabbitmq:
shutdownTimeout: 10000 # 10 seconds (default: 5000)
Testing
Unit Testing Consumer Services
Test consumer methods like any regular Grails service method:
import grails.testing.services.ServiceUnitTest
import spock.lang.Specification
class OrderServiceSpec extends Specification implements ServiceUnitTest<OrderService> {
def "test order processing"() {
given:
def message = [orderId: '123', item: 'Widget', quantity: 5]
when:
service.processOrder(message)
then:
// Verify expected behavior
noExceptionThrown()
}
}
Integration Testing
For integration tests that verify the full message flow, you will need a running RabbitMQ instance:
import grails.testing.mixin.integration.Integration
import spock.lang.Specification
@Integration
class RabbitMQIntegrationSpec extends Specification {
def rabbitMQService
def "test message sending and receiving"() {
when:
rabbitMQService.convertAndSend('main', 'test.queue', [test: true])
then:
// Verify message was sent (check queue depth, etc.)
def count = rabbitMQService.getQueueMessageCount('main', 'test.queue')
count >= 0 // Message may have already been consumed
}
}
Using Testcontainers
For isolated integration tests, consider using Testcontainers to spin up a RabbitMQ instance:
import org.testcontainers.containers.RabbitMQContainer
class RabbitMQTestSupport {
static RabbitMQContainer rabbitMQ = new RabbitMQContainer('rabbitmq:3-management')
static void setup() {
rabbitMQ.start()
// Configure your test application to use:
// rabbitMQ.host, rabbitMQ.amqpPort
}
static void cleanup() {
rabbitMQ.stop()
}
}
Troubleshooting
Common Issues
No rabbit configuration specified
Symptom: Log message "No rabbit configuration specified, skipping configuration"
Solution: Ensure your rabbitmq configuration block is defined in application.groovy or application.yml.
Connection factory settings must be defined
Symptom: Error log about missing factory.username, factory.password, or factory.hostname
Solution: Every connection factory must have username, password, and either hostname or addresses defined.
There is no available RabbitTemplate for alias
Symptom: RuntimeException when sending messages
Solution: Verify the connection alias matches one of your configured connection factory names.
There is no container bean for
Symptom: RuntimeException when starting/stopping containers
Solution: Verify the queue or exchange name matches a configured listener.
Debugging
Enable debug logging for the plugin:
logging:
level:
com.bertram.rabbitmq: DEBUG
Or in logback.groovy:
logger('com.bertram.rabbitmq', DEBUG)
This will log:
-
Connection factory initialization
-
Listener registration details
-
Queue and exchange bean names
-
Message delegation events
-
Persistence context lifecycle
Monitoring
Listing All Containers
class MonitoringService {
def rabbitMQService
def getContainerStatus() {
def containers = rabbitMQService.getRabbitListenerContainers()
containers.collect { beanName, container ->
[
name: beanName,
running: container.running,
startOnLoad: container.startOnLoad
]
}
}
}
Queue Depth Monitoring
class QueueMonitorService {
def rabbitMQService
def checkQueueDepths(List<String> queueNames) {
queueNames.collect { queueName ->
[
queue: queueName,
messageCount: rabbitMQService.getQueueMessageCount('main', queueName)
]
}
}
}
Spring Bean Naming
The plugin generates Spring bean names following a consistent convention. Understanding these names is useful for debugging and advanced Spring configuration:
-
Connection Factory:
connectionFactory_<alias>(e.g.,connectionFactory_main) -
RabbitMQ Connection Factory:
rmqConnectionFactory_<alias> -
Rabbit Template:
rabbitTemplate_<alias> -
Rabbit Admin:
rabbitAdmin_<alias> -
Queue Bean:
queue_<queueName> -
Exchange Bean:
exchange_<exchangeName> -
Container Bean:
container_<queueOrExchangeBean> -
Error Handler:
errorHandler_<alias>
The first configured connection factory is also aliased as rabbitConnectionFactory.
Best Practices Summary
-
Use
@RabbitConsumeron service classes that contain queue/exchange listeners -
Set appropriate
prefetchCountto balance throughput and memory -
Use
durable = truefor queues and exchanges that must survive broker restarts -
Handle exceptions in consumer methods to control retry behavior
-
Use
autoNodeNamewith fanout exchanges in clustered deployments -
Configure TTLs and dead-letter queues at the RabbitMQ broker level for production
-
Monitor queue depths to detect processing bottlenecks
-
Set
autoStartup = falsefor consumers that should start conditionally -
Use multiple connections to separate high-throughput traffic from management operations
-
Enable SSL/TLS for production connections