When creating a Bunny channel with default options set. If I try to connect to the queue with default x-dead-letter-exchange set by sneaker, it throws an error.
Here is my sneaker configuration for Bunny
require 'connection_pool'
module Sneakers
class Bunny
include Singleton
cattr_accessor :options do
{
# The number of channels to make available in the pool.
# Usually this should be equal the number of threads working in parallel
channel_pool_size: 20,
# Time to wait for a channel to become available,
# before raising a Timeout error
channel_pool_timeout: 2,
# The exchange type declared by default.
# One of :direct, :fanout, :topic, :headers
exchange_type: :topic,
# The exchange name use by default.
exchange: :events,
# Default options passed to the exchange declaration.
exchange_options: { durable: true },
# Preload bunny connection when true.
preload: false
}
end
class << self
delegate :with, to: :instance
def configure(options)
options.merge!(options)
instance.send(:ensure_connection!) if options[:preload]
end
end
# For more information on concurrency visit:
# http://rubybunny.info/articles/concurrency.html
def initialize
@mutex = Mutex.new
@bunny = ::Bunny.new(Settings.amqp_url)
@channel_pool = ConnectionPool.new(
size: options[:channel_pool_size],
timeout: options[:channel_pool_timeout]
) { create_channel }
# gracefully shutdown Bunny connection
at_exit { stop }
end
def stop
@channel_pool.shutdown(&:close)
@bunny.stop
end
def with(&block)
ensure_connection!
@channel_pool.with(&block)
end
private
def ensure_connection!
@mutex.synchronize do
@bunny.start
end
end
def create_channel
@mutex.synchronize do
@bunny.create_channel
end
end
end
end
The Queue causing error is defined below.
class IamProfileUpdateWorker < ApplicationWorker
from_queue 'lo_iam_profile_update',
ack: true,
exchange: 'iam_profile_update',
# retry_exchange: '-retry', # It works if I pass this to the queue.
exchange_options: {
type: :fanout,
durable: true
}
def process_message(message, _delivery_info, _properties)
Iam::ProfileUpdate.call(decode(message))
ack!
end
end
This is the way I am using it
require 'sneakers/bunny'
module Sneakers
module Purge
extend ActiveSupport::Concern
module ClassMethods
def purge
Rails.logger.debug { "[CLEANER] Purging #{self}!" }
new.purge
end
end
# Purge all messages from the queue.
def purge
Sneakers::Bunny.with do |channel|
handler_klass = opts[:handler] || Sneakers::CONFIG.fetch(:handler)
# Configure options if needed
if handler_klass.respond_to?(:configure_queue)
@opts[:queue_options] = handler_klass.configure_queue(
@queue.name,
@opts
)
end
channel.queue(@queue.name, @opts[:queue_options]).purge # Line generating error.
end
end
end
end
If I don't add the retry_exchange option to the worker IamProfileUpdateWorker, I get the following error
PRECONDITION_FAILED - inequivalent arg 'x-dead-letter-exchange' for queue 'lo_iam_profile_update' in vhost '/': received 'activejob-retry' but current is '-retry'
activejob-retry is the default name set for the MaxRetry queue during config but it seems Bunny isn't using it when creating the connection.
Is there something I am not doing right?
When creating a Bunny channel with default options set. If I try to connect to the queue with default
x-dead-letter-exchangeset by sneaker, it throws an error.Here is my sneaker configuration for Bunny
The Queue causing error is defined below.
This is the way I am using it
If I don't add the retry_exchange option to the worker
IamProfileUpdateWorker, I get the following erroractivejob-retryis the default name set for the MaxRetry queue during config but it seems Bunny isn't using it when creating the connection.Is there something I am not doing right?