Introduction
The Grails RabbitMQ AMQP plugin provides a powerful, annotation-driven approach to integrating RabbitMQ messaging into Grails applications. Unlike the standard grails-rabbitmq plugin, this plugin supports multiple connection factories, annotation-based consumer configuration at the method level, and exchange/topic subscriptions—all with minimal boilerplate.
Key Features
-
Multiple Connection Factories: Connect to multiple RabbitMQ hosts simultaneously
-
Annotation-Based Consumers: Configure queue and exchange listeners using
@Queueand@Subscriberannotations on service methods -
Exchange Support: Full support for topic, direct, fanout, and headers exchanges
-
SendRabbitMessage Trait: Automatically adds
sendRabbitMessage()methods to all services and controllers -
Runtime Listener Creation: Dynamically create queue consumers and topic subscribers at runtime
-
Retry Support: Built-in configurable retry policies with backoff
-
SSL/TLS Support: Secure connections to RabbitMQ brokers
-
Container Management: Start, stop, and manage listener containers programmatically
Supported Exchange Types
-
Topic - Route messages by pattern-matching routing keys
-
Direct - Route messages by exact routing key match
-
Fanout - Broadcast messages to all bound queues
-
Headers - Route messages based on header attributes
Installation
Grails 7
Add the plugin to your build.gradle:
dependencies {
implementation 'cloud.wondrify:grails-rabbit-amqp-plugin:7.0.0'
}
Grails 6.x and Earlier
For Grails 6.x and earlier versions, use the previous artifact coordinates:
dependencies {
implementation 'com.bertramlabs.plugins:rabbit-amqp:6.x.x'
}
Configuration
All RabbitMQ configuration is placed under the rabbitmq key in your Grails configuration.
Connection Factories
Connection factories define how your application connects to RabbitMQ brokers. You can configure one or more connections.
YAML Configuration (application.yml)
rabbitmq:
connectionFactories:
- name: main
hostname: localhost
port: 5672
username: guest
password: guest
virtualHost: /
channelCacheSize: 10
heartBeatDelay: 580
- name: secondary
hostname: rabbitmq2.example.com
port: 5672
username: myuser
password: mypassword
Groovy Configuration (application.groovy)
rabbitmq {
connectionFactories {
factory(
name: 'main',
hostname: 'localhost',
port: 5672,
username: 'guest',
password: 'guest',
virtualHost: '/',
channelCacheSize: 10,
heartBeatDelay: 580
)
factory(
name: 'secondary',
hostname: 'rabbitmq2.example.com',
port: 5672,
username: 'myuser',
password: 'mypassword'
)
}
}
Connection Factory Options
-
name: Alias for this connection (used to reference it in annotations and when sending messages)
-
hostname: RabbitMQ server hostname
-
port: RabbitMQ server port (default:
5672) -
username: Authentication username
-
password: Authentication password
-
virtualHost: RabbitMQ virtual host
-
channelCacheSize: Number of cached channels (default:
10) -
heartBeatDelay: Heartbeat interval in seconds (default:
580) -
addresses: Comma-separated list of
host:portaddresses for cluster connections -
useSsl: Enable SSL/TLS connection (default:
false) -
sslProtocol: SSL protocol version (default:
TLSv1.2)
Retry Policy
Configure the global retry policy for message consumption:
rabbitmq:
retryPolicy:
maxAttempts: 3
backOffPeriod: 5000
Shutdown Timeout
Configure the shutdown timeout for listener containers:
rabbitmq:
shutdownTimeout: 5000
Quick Start
Here’s a minimal example to get started with queue consumption:
// application.groovy
rabbitmq {
connectionFactories {
factory(
name: 'main',
hostname: 'localhost',
username: 'guest',
password: 'guest'
)
}
}
import com.bertram.rabbitmq.conf.RabbitConsumer
import com.bertram.rabbitmq.conf.Queue
@RabbitConsumer
class MyMessageService {
@Queue(name = 'my.queue', durable = true)
def handleMessage(Map message) {
println "Received message: ${message}"
}
}
Consuming Messages
Services that consume RabbitMQ messages must be annotated with @RabbitConsumer at the class level. Individual methods are then annotated with either @Queue (for direct queue consumption) or @Subscriber (for exchange subscriptions).
Queue Consumers
Use the @Queue annotation to consume messages from a specific queue:
import com.bertram.rabbitmq.conf.RabbitConsumer
import com.bertram.rabbitmq.conf.Queue
@RabbitConsumer
class OrderService {
@Queue(name = 'orders.new', durable = true, consumers = 3, prefetchCount = 10)
def processOrder(Map message) {
// Process incoming order
println "Processing order: ${message.orderId}"
}
@Queue(name = 'orders.cancel', durable = true)
def cancelOrder(Map message) {
// Handle order cancellation
println "Cancelling order: ${message.orderId}"
}
}
@Queue Annotation Options
-
name: Queue name (required)
-
durable: Whether the queue survives broker restarts (default:
true) -
autoDelete: Whether the queue is deleted when no longer in use (default:
false) -
exclusive: Whether the queue is exclusive to the connection (default:
true) -
consumers: Number of concurrent consumers (default:
1) -
prefetchCount: Number of messages to prefetch per consumer (default:
0— broker default) -
conAlias: Connection factory alias to use (default: first configured factory)
-
autoStartup: Whether to start consuming on application boot (default:
true) -
params: Additional AMQP parameters as
@AMQPParameterannotations
Exchange Subscribers
Use the @Subscriber annotation to subscribe to messages from an exchange:
import com.bertram.rabbitmq.conf.RabbitConsumer
import com.bertram.rabbitmq.conf.Subscriber
import com.bertram.rabbitmq.conf.ExchangeParams
import com.bertram.rabbitmq.conf.QueueParams
@RabbitConsumer
class NotificationService {
@Subscriber(
exchangeParams = @ExchangeParams(
name = 'notifications',
exchangeType = com.bertram.rabbitmq.conf.ExchangeType.topic,
durable = true,
autoDelete = false
),
routingKey = 'notification.email.#',
consumers = 2
)
def handleEmailNotification(Map message) {
// Handle email notifications
println "Sending email to: ${message.recipient}"
}
@Subscriber(
exchangeParams = @ExchangeParams(
name = 'notifications',
exchangeType = com.bertram.rabbitmq.conf.ExchangeType.topic,
durable = true
),
routingKey = 'notification.sms.#',
multiSubscriber = true
)
def handleSmsNotification(Map message) {
// Handle SMS notifications
println "Sending SMS to: ${message.phoneNumber}"
}
}
@Subscriber Annotation Options
-
routingKey: Routing key pattern for binding (default:
#— matches all) -
consumers: Number of concurrent consumers (default:
1) -
prefetchCount: Number of messages to prefetch (default:
0) -
conAlias: Connection factory alias
-
autoStartup: Whether to start on boot (default:
true) -
multiSubscriber: Whether multiple methods can subscribe to the same exchange (default:
false) -
exchangeParams: Exchange configuration via
@ExchangeParams -
queueParams: Queue configuration via
@QueueParams
@ExchangeParams Options
-
name: Exchange name (required)
-
exchangeType: Type of exchange —
topic,direct,fanout, orheaders(default:topic) -
durable: Whether the exchange survives broker restarts (default:
true) -
autoDelete: Whether the exchange is deleted when no longer in use (default:
false)
@QueueParams Options
-
name: Queue name (empty for auto-generated anonymous queue)
-
autoNodeName: Whether to generate a queue name based on the hostname (default:
false) -
durable: Queue durability (default:
false) -
autoDelete: Auto-delete queue (default:
true) -
exclusive: Exclusive queue (default:
true)
Message Types
Handler methods can accept several message types. The message converter will automatically deserialize based on the method parameter type:
@RabbitConsumer
class MessageHandlerService {
@Queue(name = 'queue.map')
def handleMapMessage(Map message) {
// Handle Map messages
}
@Queue(name = 'queue.string')
def handleStringMessage(String message) {
// Handle String messages
}
@Queue(name = 'queue.bytes')
def handleByteMessage(byte[] message) {
// Handle raw byte messages
}
@Queue(name = 'queue.list')
def handleListMessage(List message) {
// Handle List messages
}
}
Sending Messages
Using the SendRabbitMessage Trait
The plugin automatically enhances all Grails services and controllers with the SendRabbitMessage trait, providing sendRabbitMessage() methods:
class OrderController {
def createOrder() {
def orderData = [orderId: '12345', item: 'Widget', quantity: 10]
// Send to default exchange/queue
sendRabbitMessage(orderData)
// Send to a specific connection
sendRabbitMessage('main', orderData)
// Send with a routing key
sendRabbitMessage('main', 'orders.new', orderData)
// Send to a specific exchange with routing key
sendRabbitMessage('main', 'order-exchange', 'orders.new', orderData)
render "Order submitted"
}
}
Using RabbitMQService Directly
For more control, inject and use RabbitMQService directly:
class OrderProcessingService {
def rabbitMQService
def submitOrder(Map orderData) {
// Simple send
rabbitMQService.convertAndSend(orderData)
// Send to a specific connection
rabbitMQService.convertAndSend('main', orderData)
// Send with routing key
rabbitMQService.convertAndSend('main', 'orders.new', orderData)
// Send to exchange with routing key
rabbitMQService.convertAndSend('main', 'order-exchange', 'orders.new', orderData)
}
}
Sending with a MessagePostProcessor
You can modify message properties before sending using a MessagePostProcessor:
import org.springframework.amqp.core.MessagePostProcessor
import org.springframework.amqp.core.Message
class PriorityMessageService {
def rabbitMQService
def sendHighPriorityMessage(Map data) {
MessagePostProcessor processor = { Message message ->
message.messageProperties.priority = 10
message.messageProperties.setHeader('X-Custom-Header', 'urgent')
return message
}
rabbitMQService.convertAndSend('main', 'priority-exchange', 'high', data, processor)
}
}
Container Management
The plugin provides methods on RabbitMQService to manage listener containers at runtime.
Starting and Stopping Containers
class AdminService {
def rabbitMQService
def pauseQueueProcessing(String queueName) {
rabbitMQService.stopRabbitQueueContainer(queueName)
}
def resumeQueueProcessing(String queueName) {
rabbitMQService.startRabbitQueueContainer(queueName)
}
def pauseExchangeSubscription(String exchangeName) {
rabbitMQService.stopRabbitSubscriberContainer(exchangeName)
}
def resumeExchangeSubscription(String exchangeName) {
rabbitMQService.startRabbitSubscriberContainer(exchangeName)
}
def stopAll() {
rabbitMQService.stopAllRabbitContainers()
}
def startAll() {
rabbitMQService.startAllRabbitContainers()
}
}
Listing Active Containers
def containers = rabbitMQService.getRabbitListenerContainers()
containers.each { beanName, container ->
println "${beanName}: running=${container.running}"
}
Checking Queue Depth
def messageCount = rabbitMQService.getQueueMessageCount('main', 'orders.new')
println "Messages in queue: ${messageCount}"
Runtime Listeners
You can create queue consumers and exchange subscribers dynamically at runtime.
Dynamic Queue Consumer
class DynamicListenerService {
def rabbitMQService
def createDynamicConsumer() {
rabbitMQService.createQueueConsumer([
queueName: 'dynamic.queue',
connection: 'main',
consumers: 1,
durable: true,
autoDelete: false,
startOnLoad: true
]) { message ->
println "Dynamic consumer received: ${message}"
}
}
}
Dynamic Topic Subscriber
class DynamicSubscriberService {
def rabbitMQService
def createDynamicSubscriber() {
rabbitMQService.createSubscriber([
exchangeName: 'events',
exchangeType: com.bertram.rabbitmq.conf.ExchangeType.topic,
routingKey: 'event.user.#',
connection: 'main'
]) { message ->
println "Dynamic subscriber received: ${message}"
}
}
}
RabbitMQService API
Message Sending Methods
-
convertAndSend(Object message)— Send a message to the default template -
convertAndSend(String connectionAlias, Object message)— Send to a specific connection -
convertAndSend(String connectionAlias, String routingKey, Object message)— Send with a routing key -
convertAndSend(String connectionAlias, String exchange, String routingKey, Object message)— Send to a specific exchange -
convertAndSend(String connectionAlias, String exchange, String routingKey, Object message, MessagePostProcessor processor)— Send with a post-processor
Container Management Methods
-
stopRabbitQueueContainer(container)— Stop a queue listener container -
startRabbitQueueContainer(container)— Start a queue listener container -
stopRabbitSubscriberContainer(container)— Stop an exchange subscriber container -
startRabbitSubscriberContainer(container)— Start an exchange subscriber container -
stopAllRabbitContainers()— Stop all listener containers -
startAllRabbitContainers()— Start all listener containers -
getRabbitListenerContainers()— Get all listener container beans
Utility Methods
-
getQueueMessageCount(String conAlias, String queueName)— Get the number of messages in a queue -
createSubscriber(Map config, Closure adapter)— Create a runtime topic subscriber -
createQueueConsumer(Map config, Closure adapter)— Create a runtime queue consumer
Advanced Usage
For detailed information on advanced features and patterns, see Advanced Usage.
Topics covered in the advanced guide:
-
SSL/TLS Configuration
-
Multiple Connection Factories
-
Fanout Exchange Patterns
-
Error Handling and Retry
-
Performance Tuning
-
Testing
Getting Help
-
GitHub Issues: https://github.com/wondrify/grails-rabbit-amqp-plugin
-
Source Code: https://github.com/wondrify/grails-rabbit-amqp-plugin
License
Grails RabbitMQ AMQP Plugin is open source software licensed under the Apache License 2.0.