SSL/TLS Configuration

To connect to RabbitMQ over SSL/TLS, enable the useSsl flag and optionally specify the protocol:

rabbitmq {
    connectionFactories {
        factory(
            name: 'secure',
            hostname: 'rabbitmq.example.com',
            port: 5671,
            username: 'myuser',
            password: 'mypassword',
            useSsl: true,
            sslProtocol: 'TLSv1.2'
        )
    }
}

The sslProtocol defaults to TLSv1.2 when useSsl is enabled. The SSL context is configured via the ConnectionFactoryHelper which wraps the underlying RabbitMQ ConnectionFactory.

Multiple Connection Factories

The plugin supports connecting to multiple RabbitMQ brokers simultaneously. Each factory is identified by its name alias. The first configured factory becomes the default connection (aliased as rabbitConnectionFactory in Spring).

rabbitmq {
    connectionFactories {
        factory(
            name: 'primary',
            hostname: 'rabbitmq-primary.example.com',
            username: 'user1',
            password: 'pass1'
        )
        factory(
            name: 'analytics',
            hostname: 'rabbitmq-analytics.example.com',
            username: 'user2',
            password: 'pass2'
        )
    }
}

Using a Specific Connection in Consumers

Reference the connection alias in @Queue or @Subscriber annotations:

import com.bertram.rabbitmq.conf.RabbitConsumer
import com.bertram.rabbitmq.conf.Queue

@RabbitConsumer
class AnalyticsService {

    @Queue(name = 'analytics.events', conAlias = 'analytics', durable = true)
    def processAnalyticsEvent(Map event) {
        // This consumer reads from the 'analytics' RabbitMQ broker
        println "Analytics event: ${event}"
    }
}

Sending to a Specific Connection

Specify the connection alias when sending messages:

class EventPublisher {
    def rabbitMQService

    def publishToAnalytics(Map event) {
        rabbitMQService.convertAndSend('analytics', 'events.track', event)
    }

    def publishToPrimary(Map data) {
        rabbitMQService.convertAndSend('primary', 'tasks.process', data)
    }
}

Cluster and High Availability

Cluster Connections

For RabbitMQ clusters, use the addresses property to specify multiple nodes:

rabbitmq {
    connectionFactories {
        factory(
            name: 'cluster',
            username: 'guest',
            password: 'guest',
            addresses: 'rabbit1:5672,rabbit2:5672,rabbit3:5672'
        )
    }
}

When addresses is specified, the hostname and port properties are ignored. The Spring AMQP connection factory will attempt each address in order.

Exchange Patterns

Topic Exchange

Topic exchanges route messages based on wildcard pattern matching on the routing key. Use * to match a single word and # to match zero or more words.

@RabbitConsumer
class LogService {

    @Subscriber(
        exchangeParams = @ExchangeParams(
            name = 'logs',
            exchangeType = com.bertram.rabbitmq.conf.ExchangeType.topic,
            durable = true
        ),
        routingKey = 'log.error.#'
    )
    def handleErrors(Map logEntry) {
        // Receives all error logs: log.error, log.error.database, etc.
    }

    @Subscriber(
        exchangeParams = @ExchangeParams(name = 'logs'),
        routingKey = 'log.*.database',
        multiSubscriber = true
    )
    def handleDatabaseLogs(Map logEntry) {
        // Receives all database logs: log.error.database, log.info.database, etc.
    }
}

Direct Exchange

Direct exchanges route messages to queues whose binding key exactly matches the routing key:

@RabbitConsumer
class TaskService {

    @Subscriber(
        exchangeParams = @ExchangeParams(
            name = 'tasks',
            exchangeType = com.bertram.rabbitmq.conf.ExchangeType.direct,
            durable = true
        ),
        routingKey = 'task.email'
    )
    def handleEmailTask(Map task) {
        // Only receives messages with routing key 'task.email'
    }
}

Fanout Exchange

Fanout exchanges broadcast every message to all bound queues, ignoring routing keys:

@RabbitConsumer
class BroadcastService {

    @Subscriber(
        exchangeParams = @ExchangeParams(
            name = 'broadcasts',
            exchangeType = com.bertram.rabbitmq.conf.ExchangeType.fanout,
            durable = true
        ),
        queueParams = @QueueParams(
            autoNodeName = true,
            durable = false,
            autoDelete = true,
            exclusive = false
        )
    )
    def handleBroadcast(Map message) {
        // Every instance of this application receives every broadcast
        println "Broadcast received: ${message}"
    }
}

When using autoNodeName = true in @QueueParams, the plugin generates a queue name using the hostname (<exchangeName>-<hostname>), ensuring each application node gets its own queue. This is particularly useful for fanout exchanges in HA deployments where each node should receive its own copy of every message.

If the hostname resolves to localhost, this can cause problems in HA environments since all nodes would share the same queue name.

Headers Exchange

Headers exchanges route based on message header attributes rather than routing keys:

@RabbitConsumer
class HeaderRoutingService {

    @Subscriber(
        exchangeParams = @ExchangeParams(
            name = 'header-exchange',
            exchangeType = com.bertram.rabbitmq.conf.ExchangeType.headers,
            durable = true
        )
    )
    def handleHeaderMatched(Map message) {
        // Process messages matching header criteria
    }
}

Error Handling and Retry

Built-in Retry Policy

The plugin configures a StatefulRetryOperationsInterceptorFactoryBean with configurable retry parameters. Failed message deliveries are automatically retried based on the configured policy:

rabbitmq:
  retryPolicy:
    maxAttempts: 5        # Maximum retry attempts (default: 3)
    backOffPeriod: 10000  # Delay between retries in ms (default: 5000)

The retry policy uses a FixedBackOffPolicy, meaning each retry waits the same amount of time.

Custom Error Handling

The plugin registers a RabbitErrorHandler for each connection factory. Errors during message consumption are logged via this handler. Override it by defining your own bean:

// In resources.groovy or doWithSpring
beans = {
    rabbitErrorHandler(MyCustomErrorHandler)
}

Handling Errors in Consumer Methods

You should handle exceptions within your consumer methods to prevent message loss:

@RabbitConsumer
class ResilientService {

    @Queue(name = 'important.tasks', durable = true)
    def handleTask(Map message) {
        try {
            processTask(message)
        } catch (TemporaryException e) {
            log.warn("Temporary failure processing task, will be retried: ${e.message}")
            throw e  // Re-throw to trigger retry
        } catch (PermanentException e) {
            log.error("Permanent failure processing task: ${e.message}", e)
            sendToDeadLetterQueue(message)
            // Don't re-throw - message is acknowledged and won't be retried
        }
    }
}

Persistence Context

The plugin automatically manages Hibernate persistence contexts for consumer methods. When a message is received:

  1. A persistence context is initialized via PersistenceContextInterceptor

  2. The consumer method is invoked

  3. The persistence context is flushed and destroyed

This means you can safely use GORM operations within consumer methods without manually managing sessions:

@RabbitConsumer
class UserSyncService {

    @Queue(name = 'user.sync', durable = true)
    def syncUser(Map userData) {
        def user = User.findByExternalId(userData.externalId)
        if (user) {
            user.name = userData.name
            user.email = userData.email
            user.save(flush: true)
        } else {
            new User(
                externalId: userData.externalId,
                name: userData.name,
                email: userData.email
            ).save(flush: true)
        }
    }
}

Performance Tuning

Concurrency

Increase the number of concurrent consumers for high-throughput queues:

@Queue(name = 'high-volume.queue', consumers = 10, prefetchCount = 50, durable = true)
def handleHighVolume(Map message) {
    // 10 concurrent consumers, each prefetching up to 50 messages
}

Guidelines:

  • Start with 1 consumer and increase based on observed throughput

  • Set prefetchCount to balance throughput vs. memory usage

  • For CPU-bound tasks, set consumers to the number of available cores

  • For I/O-bound tasks, consumers can exceed the number of cores

Channel Cache Size

The channelCacheSize controls how many channels are cached per connection. Increase this for applications with many concurrent publishers:

factory(
    name: 'main',
    hostname: 'localhost',
    username: 'guest',
    password: 'guest',
    channelCacheSize: 25  // Default is 10
)

Heartbeat Configuration

The heartBeatDelay controls the heartbeat interval between the client and RabbitMQ broker. A lower value detects connection failures faster but increases network traffic:

factory(
    name: 'main',
    hostname: 'localhost',
    username: 'guest',
    password: 'guest',
    heartBeatDelay: 60  // 60 seconds (default is 580)
)

Shutdown Timeout

Control how long listener containers wait for in-flight messages to complete during shutdown:

rabbitmq:
  shutdownTimeout: 10000  # 10 seconds (default: 5000)

Testing

Unit Testing Consumer Services

Test consumer methods like any regular Grails service method:

import grails.testing.services.ServiceUnitTest
import spock.lang.Specification

class OrderServiceSpec extends Specification implements ServiceUnitTest<OrderService> {

    def "test order processing"() {
        given:
        def message = [orderId: '123', item: 'Widget', quantity: 5]

        when:
        service.processOrder(message)

        then:
        // Verify expected behavior
        noExceptionThrown()
    }
}

Integration Testing

For integration tests that verify the full message flow, you will need a running RabbitMQ instance:

import grails.testing.mixin.integration.Integration
import spock.lang.Specification

@Integration
class RabbitMQIntegrationSpec extends Specification {

    def rabbitMQService

    def "test message sending and receiving"() {
        when:
        rabbitMQService.convertAndSend('main', 'test.queue', [test: true])

        then:
        // Verify message was sent (check queue depth, etc.)
        def count = rabbitMQService.getQueueMessageCount('main', 'test.queue')
        count >= 0  // Message may have already been consumed
    }
}

Using Testcontainers

For isolated integration tests, consider using Testcontainers to spin up a RabbitMQ instance:

import org.testcontainers.containers.RabbitMQContainer

class RabbitMQTestSupport {
    static RabbitMQContainer rabbitMQ = new RabbitMQContainer('rabbitmq:3-management')

    static void setup() {
        rabbitMQ.start()
        // Configure your test application to use:
        //   rabbitMQ.host, rabbitMQ.amqpPort
    }

    static void cleanup() {
        rabbitMQ.stop()
    }
}

Troubleshooting

Common Issues

No rabbit configuration specified

Symptom: Log message "No rabbit configuration specified, skipping configuration"

Solution: Ensure your rabbitmq configuration block is defined in application.groovy or application.yml.

Connection factory settings must be defined

Symptom: Error log about missing factory.username, factory.password, or factory.hostname

Solution: Every connection factory must have username, password, and either hostname or addresses defined.

There is no available RabbitTemplate for alias

Symptom: RuntimeException when sending messages

Solution: Verify the connection alias matches one of your configured connection factory names.

There is no container bean for

Symptom: RuntimeException when starting/stopping containers

Solution: Verify the queue or exchange name matches a configured listener.

Debugging

Enable debug logging for the plugin:

logging:
  level:
    com.bertram.rabbitmq: DEBUG

Or in logback.groovy:

logger('com.bertram.rabbitmq', DEBUG)

This will log:

  • Connection factory initialization

  • Listener registration details

  • Queue and exchange bean names

  • Message delegation events

  • Persistence context lifecycle

Monitoring

Listing All Containers

class MonitoringService {
    def rabbitMQService

    def getContainerStatus() {
        def containers = rabbitMQService.getRabbitListenerContainers()
        containers.collect { beanName, container ->
            [
                name: beanName,
                running: container.running,
                startOnLoad: container.startOnLoad
            ]
        }
    }
}

Queue Depth Monitoring

class QueueMonitorService {
    def rabbitMQService

    def checkQueueDepths(List<String> queueNames) {
        queueNames.collect { queueName ->
            [
                queue: queueName,
                messageCount: rabbitMQService.getQueueMessageCount('main', queueName)
            ]
        }
    }
}

Spring Bean Naming

The plugin generates Spring bean names following a consistent convention. Understanding these names is useful for debugging and advanced Spring configuration:

  • Connection Factory: connectionFactory_<alias> (e.g., connectionFactory_main)

  • RabbitMQ Connection Factory: rmqConnectionFactory_<alias>

  • Rabbit Template: rabbitTemplate_<alias>

  • Rabbit Admin: rabbitAdmin_<alias>

  • Queue Bean: queue_<queueName>

  • Exchange Bean: exchange_<exchangeName>

  • Container Bean: container_<queueOrExchangeBean>

  • Error Handler: errorHandler_<alias>

The first configured connection factory is also aliased as rabbitConnectionFactory.

Best Practices Summary

  1. Use @RabbitConsumer on service classes that contain queue/exchange listeners

  2. Set appropriate prefetchCount to balance throughput and memory

  3. Use durable = true for queues and exchanges that must survive broker restarts

  4. Handle exceptions in consumer methods to control retry behavior

  5. Use autoNodeName with fanout exchanges in clustered deployments

  6. Configure TTLs and dead-letter queues at the RabbitMQ broker level for production

  7. Monitor queue depths to detect processing bottlenecks

  8. Set autoStartup = false for consumers that should start conditionally

  9. Use multiple connections to separate high-throughput traffic from management operations

  10. Enable SSL/TLS for production connections