Introduction

The Grails RabbitMQ AMQP plugin provides a powerful, annotation-driven approach to integrating RabbitMQ messaging into Grails applications. Unlike the standard grails-rabbitmq plugin, this plugin supports multiple connection factories, annotation-based consumer configuration at the method level, and exchange/topic subscriptions—all with minimal boilerplate.

Key Features

  • Multiple Connection Factories: Connect to multiple RabbitMQ hosts simultaneously

  • Annotation-Based Consumers: Configure queue and exchange listeners using @Queue and @Subscriber annotations on service methods

  • Exchange Support: Full support for topic, direct, fanout, and headers exchanges

  • SendRabbitMessage Trait: Automatically adds sendRabbitMessage() methods to all services and controllers

  • Runtime Listener Creation: Dynamically create queue consumers and topic subscribers at runtime

  • Retry Support: Built-in configurable retry policies with backoff

  • SSL/TLS Support: Secure connections to RabbitMQ brokers

  • Container Management: Start, stop, and manage listener containers programmatically

Supported Exchange Types

  • Topic - Route messages by pattern-matching routing keys

  • Direct - Route messages by exact routing key match

  • Fanout - Broadcast messages to all bound queues

  • Headers - Route messages based on header attributes

Installation

Grails 7

Add the plugin to your build.gradle:

dependencies {
    implementation 'cloud.wondrify:grails-rabbit-amqp-plugin:7.0.0'
}

Grails 6.x and Earlier

For Grails 6.x and earlier versions, use the previous artifact coordinates:

dependencies {
    implementation 'com.bertramlabs.plugins:rabbit-amqp:6.x.x'
}

Configuration

All RabbitMQ configuration is placed under the rabbitmq key in your Grails configuration.

Connection Factories

Connection factories define how your application connects to RabbitMQ brokers. You can configure one or more connections.

YAML Configuration (application.yml)

rabbitmq:
  connectionFactories:
    - name: main
      hostname: localhost
      port: 5672
      username: guest
      password: guest
      virtualHost: /
      channelCacheSize: 10
      heartBeatDelay: 580
    - name: secondary
      hostname: rabbitmq2.example.com
      port: 5672
      username: myuser
      password: mypassword

Groovy Configuration (application.groovy)

rabbitmq {
    connectionFactories {
        factory(
            name: 'main',
            hostname: 'localhost',
            port: 5672,
            username: 'guest',
            password: 'guest',
            virtualHost: '/',
            channelCacheSize: 10,
            heartBeatDelay: 580
        )
        factory(
            name: 'secondary',
            hostname: 'rabbitmq2.example.com',
            port: 5672,
            username: 'myuser',
            password: 'mypassword'
        )
    }
}

Connection Factory Options

  • name: Alias for this connection (used to reference it in annotations and when sending messages)

  • hostname: RabbitMQ server hostname

  • port: RabbitMQ server port (default: 5672)

  • username: Authentication username

  • password: Authentication password

  • virtualHost: RabbitMQ virtual host

  • channelCacheSize: Number of cached channels (default: 10)

  • heartBeatDelay: Heartbeat interval in seconds (default: 580)

  • addresses: Comma-separated list of host:port addresses for cluster connections

  • useSsl: Enable SSL/TLS connection (default: false)

  • sslProtocol: SSL protocol version (default: TLSv1.2)

Retry Policy

Configure the global retry policy for message consumption:

rabbitmq:
  retryPolicy:
    maxAttempts: 3
    backOffPeriod: 5000

Shutdown Timeout

Configure the shutdown timeout for listener containers:

rabbitmq:
  shutdownTimeout: 5000

Quick Start

Here’s a minimal example to get started with queue consumption:

// application.groovy
rabbitmq {
    connectionFactories {
        factory(
            name: 'main',
            hostname: 'localhost',
            username: 'guest',
            password: 'guest'
        )
    }
}
import com.bertram.rabbitmq.conf.RabbitConsumer
import com.bertram.rabbitmq.conf.Queue

@RabbitConsumer
class MyMessageService {

    @Queue(name = 'my.queue', durable = true)
    def handleMessage(Map message) {
        println "Received message: ${message}"
    }
}

Consuming Messages

Services that consume RabbitMQ messages must be annotated with @RabbitConsumer at the class level. Individual methods are then annotated with either @Queue (for direct queue consumption) or @Subscriber (for exchange subscriptions).

Queue Consumers

Use the @Queue annotation to consume messages from a specific queue:

import com.bertram.rabbitmq.conf.RabbitConsumer
import com.bertram.rabbitmq.conf.Queue

@RabbitConsumer
class OrderService {

    @Queue(name = 'orders.new', durable = true, consumers = 3, prefetchCount = 10)
    def processOrder(Map message) {
        // Process incoming order
        println "Processing order: ${message.orderId}"
    }

    @Queue(name = 'orders.cancel', durable = true)
    def cancelOrder(Map message) {
        // Handle order cancellation
        println "Cancelling order: ${message.orderId}"
    }
}

@Queue Annotation Options

  • name: Queue name (required)

  • durable: Whether the queue survives broker restarts (default: true)

  • autoDelete: Whether the queue is deleted when no longer in use (default: false)

  • exclusive: Whether the queue is exclusive to the connection (default: true)

  • consumers: Number of concurrent consumers (default: 1)

  • prefetchCount: Number of messages to prefetch per consumer (default: 0 — broker default)

  • conAlias: Connection factory alias to use (default: first configured factory)

  • autoStartup: Whether to start consuming on application boot (default: true)

  • params: Additional AMQP parameters as @AMQPParameter annotations

Exchange Subscribers

Use the @Subscriber annotation to subscribe to messages from an exchange:

import com.bertram.rabbitmq.conf.RabbitConsumer
import com.bertram.rabbitmq.conf.Subscriber
import com.bertram.rabbitmq.conf.ExchangeParams
import com.bertram.rabbitmq.conf.QueueParams

@RabbitConsumer
class NotificationService {

    @Subscriber(
        exchangeParams = @ExchangeParams(
            name = 'notifications',
            exchangeType = com.bertram.rabbitmq.conf.ExchangeType.topic,
            durable = true,
            autoDelete = false
        ),
        routingKey = 'notification.email.#',
        consumers = 2
    )
    def handleEmailNotification(Map message) {
        // Handle email notifications
        println "Sending email to: ${message.recipient}"
    }

    @Subscriber(
        exchangeParams = @ExchangeParams(
            name = 'notifications',
            exchangeType = com.bertram.rabbitmq.conf.ExchangeType.topic,
            durable = true
        ),
        routingKey = 'notification.sms.#',
        multiSubscriber = true
    )
    def handleSmsNotification(Map message) {
        // Handle SMS notifications
        println "Sending SMS to: ${message.phoneNumber}"
    }
}

@Subscriber Annotation Options

  • routingKey: Routing key pattern for binding (default: # — matches all)

  • consumers: Number of concurrent consumers (default: 1)

  • prefetchCount: Number of messages to prefetch (default: 0)

  • conAlias: Connection factory alias

  • autoStartup: Whether to start on boot (default: true)

  • multiSubscriber: Whether multiple methods can subscribe to the same exchange (default: false)

  • exchangeParams: Exchange configuration via @ExchangeParams

  • queueParams: Queue configuration via @QueueParams

@ExchangeParams Options

  • name: Exchange name (required)

  • exchangeType: Type of exchange — topic, direct, fanout, or headers (default: topic)

  • durable: Whether the exchange survives broker restarts (default: true)

  • autoDelete: Whether the exchange is deleted when no longer in use (default: false)

@QueueParams Options

  • name: Queue name (empty for auto-generated anonymous queue)

  • autoNodeName: Whether to generate a queue name based on the hostname (default: false)

  • durable: Queue durability (default: false)

  • autoDelete: Auto-delete queue (default: true)

  • exclusive: Exclusive queue (default: true)

Message Types

Handler methods can accept several message types. The message converter will automatically deserialize based on the method parameter type:

@RabbitConsumer
class MessageHandlerService {

    @Queue(name = 'queue.map')
    def handleMapMessage(Map message) {
        // Handle Map messages
    }

    @Queue(name = 'queue.string')
    def handleStringMessage(String message) {
        // Handle String messages
    }

    @Queue(name = 'queue.bytes')
    def handleByteMessage(byte[] message) {
        // Handle raw byte messages
    }

    @Queue(name = 'queue.list')
    def handleListMessage(List message) {
        // Handle List messages
    }
}

Sending Messages

Using the SendRabbitMessage Trait

The plugin automatically enhances all Grails services and controllers with the SendRabbitMessage trait, providing sendRabbitMessage() methods:

class OrderController {

    def createOrder() {
        def orderData = [orderId: '12345', item: 'Widget', quantity: 10]

        // Send to default exchange/queue
        sendRabbitMessage(orderData)

        // Send to a specific connection
        sendRabbitMessage('main', orderData)

        // Send with a routing key
        sendRabbitMessage('main', 'orders.new', orderData)

        // Send to a specific exchange with routing key
        sendRabbitMessage('main', 'order-exchange', 'orders.new', orderData)

        render "Order submitted"
    }
}

Using RabbitMQService Directly

For more control, inject and use RabbitMQService directly:

class OrderProcessingService {
    def rabbitMQService

    def submitOrder(Map orderData) {
        // Simple send
        rabbitMQService.convertAndSend(orderData)

        // Send to a specific connection
        rabbitMQService.convertAndSend('main', orderData)

        // Send with routing key
        rabbitMQService.convertAndSend('main', 'orders.new', orderData)

        // Send to exchange with routing key
        rabbitMQService.convertAndSend('main', 'order-exchange', 'orders.new', orderData)
    }
}

Sending with a MessagePostProcessor

You can modify message properties before sending using a MessagePostProcessor:

import org.springframework.amqp.core.MessagePostProcessor
import org.springframework.amqp.core.Message

class PriorityMessageService {
    def rabbitMQService

    def sendHighPriorityMessage(Map data) {
        MessagePostProcessor processor = { Message message ->
            message.messageProperties.priority = 10
            message.messageProperties.setHeader('X-Custom-Header', 'urgent')
            return message
        }

        rabbitMQService.convertAndSend('main', 'priority-exchange', 'high', data, processor)
    }
}

Container Management

The plugin provides methods on RabbitMQService to manage listener containers at runtime.

Starting and Stopping Containers

class AdminService {
    def rabbitMQService

    def pauseQueueProcessing(String queueName) {
        rabbitMQService.stopRabbitQueueContainer(queueName)
    }

    def resumeQueueProcessing(String queueName) {
        rabbitMQService.startRabbitQueueContainer(queueName)
    }

    def pauseExchangeSubscription(String exchangeName) {
        rabbitMQService.stopRabbitSubscriberContainer(exchangeName)
    }

    def resumeExchangeSubscription(String exchangeName) {
        rabbitMQService.startRabbitSubscriberContainer(exchangeName)
    }

    def stopAll() {
        rabbitMQService.stopAllRabbitContainers()
    }

    def startAll() {
        rabbitMQService.startAllRabbitContainers()
    }
}

Listing Active Containers

def containers = rabbitMQService.getRabbitListenerContainers()
containers.each { beanName, container ->
    println "${beanName}: running=${container.running}"
}

Checking Queue Depth

def messageCount = rabbitMQService.getQueueMessageCount('main', 'orders.new')
println "Messages in queue: ${messageCount}"

Runtime Listeners

You can create queue consumers and exchange subscribers dynamically at runtime.

Dynamic Queue Consumer

class DynamicListenerService {
    def rabbitMQService

    def createDynamicConsumer() {
        rabbitMQService.createQueueConsumer([
            queueName: 'dynamic.queue',
            connection: 'main',
            consumers: 1,
            durable: true,
            autoDelete: false,
            startOnLoad: true
        ]) { message ->
            println "Dynamic consumer received: ${message}"
        }
    }
}

Dynamic Topic Subscriber

class DynamicSubscriberService {
    def rabbitMQService

    def createDynamicSubscriber() {
        rabbitMQService.createSubscriber([
            exchangeName: 'events',
            exchangeType: com.bertram.rabbitmq.conf.ExchangeType.topic,
            routingKey: 'event.user.#',
            connection: 'main'
        ]) { message ->
            println "Dynamic subscriber received: ${message}"
        }
    }
}

RabbitMQService API

Message Sending Methods

  • convertAndSend(Object message) — Send a message to the default template

  • convertAndSend(String connectionAlias, Object message) — Send to a specific connection

  • convertAndSend(String connectionAlias, String routingKey, Object message) — Send with a routing key

  • convertAndSend(String connectionAlias, String exchange, String routingKey, Object message) — Send to a specific exchange

  • convertAndSend(String connectionAlias, String exchange, String routingKey, Object message, MessagePostProcessor processor) — Send with a post-processor

Container Management Methods

  • stopRabbitQueueContainer(container) — Stop a queue listener container

  • startRabbitQueueContainer(container) — Start a queue listener container

  • stopRabbitSubscriberContainer(container) — Stop an exchange subscriber container

  • startRabbitSubscriberContainer(container) — Start an exchange subscriber container

  • stopAllRabbitContainers() — Stop all listener containers

  • startAllRabbitContainers() — Start all listener containers

  • getRabbitListenerContainers() — Get all listener container beans

Utility Methods

  • getQueueMessageCount(String conAlias, String queueName) — Get the number of messages in a queue

  • createSubscriber(Map config, Closure adapter) — Create a runtime topic subscriber

  • createQueueConsumer(Map config, Closure adapter) — Create a runtime queue consumer

Advanced Usage

For detailed information on advanced features and patterns, see Advanced Usage.

Topics covered in the advanced guide:

  • SSL/TLS Configuration

  • Multiple Connection Factories

  • Fanout Exchange Patterns

  • Error Handling and Retry

  • Performance Tuning

  • Testing

Getting Help

License

Grails RabbitMQ AMQP Plugin is open source software licensed under the Apache License 2.0.