Class: Basquiat::Adapters::RabbitMq

Inherits:
Base
  • Object
show all
Defined in:
lib/basquiat/adapters/rabbitmq_adapter.rb,
lib/basquiat/adapters/rabbitmq/events.rb,
lib/basquiat/adapters/rabbitmq/message.rb,
lib/basquiat/adapters/rabbitmq/session.rb,
lib/basquiat/adapters/rabbitmq/connection.rb,
lib/basquiat/adapters/rabbitmq/configuration.rb,
lib/basquiat/adapters/rabbitmq/requeue_strategies/base_strategy.rb,
lib/basquiat/adapters/rabbitmq/requeue_strategies/dead_lettering.rb,
lib/basquiat/adapters/rabbitmq/requeue_strategies/auto_acknowledge.rb,
lib/basquiat/adapters/rabbitmq/requeue_strategies/delayed_delivery.rb,
lib/basquiat/adapters/rabbitmq/requeue_strategies/basic_acknowledge.rb

Overview

The RabbitMQ adapter for Basquiat

Defined Under Namespace

Classes: AutoAcknowledge, BaseStrategy, BasicAcknowledge, Configuration, Connection, DeadLettering, DelayedDelivery, Events, Message, Session

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods inherited from Base

#adapter_options, #default_options, register_strategy, #strategies, strategies, strategy

Constructor Details

#initialize(procs: Events.new) ⇒ RabbitMq

Initializes the superclass using a Events object as the procs instance variable



23
24
25
# File 'lib/basquiat/adapters/rabbitmq_adapter.rb', line 23

def initialize(procs: Events.new)
  super(procs: procs)
end

Instance Attribute Details

#procsObject (readonly)

Returns the value of attribute procs.



12
13
14
# File 'lib/basquiat/adapters/rabbitmq_adapter.rb', line 12

def procs
  @procs
end

Instance Method Details

#base_optionsObject

Since the RabbitMQ configuration options are quite vast and it’s interations with the requeue strategies a bit convoluted it uses a Configuration object to handle it all



29
30
31
32
# File 'lib/basquiat/adapters/rabbitmq_adapter.rb', line 29

def base_options
  @configuration ||= Configuration.new
  @configuration.merge_user_options(Basquiat.configuration.adapter_options)
end

#listen(block: true, rescue_proc: Basquiat.configuration.rescue_proc) ⇒ Object

Binds the queues and start the event lopp.

Parameters:

  • block (Boolean) (defaults to: true)

    block the thread



56
57
58
59
60
61
62
63
# File 'lib/basquiat/adapters/rabbitmq_adapter.rb', line 56

def listen(block: true, rescue_proc: Basquiat.configuration.rescue_proc)
  procs.keys.each { |key| session.bind_queue(key) }
  session.subscribe(block: block) do |message|
    strategy.run(message) do
      process_message(message, rescue_proc)
    end
  end
end

#process_message(message, rescue_proc) ⇒ Object



65
66
67
68
69
# File 'lib/basquiat/adapters/rabbitmq_adapter.rb', line 65

def process_message(message, rescue_proc)
  procs[message.routing_key].call(message)
rescue StandardError => ex
  rescue_proc.call(ex, message)
end

#publish(event, message, props: {}) ⇒ Object

Publishes the event to the exchange configured.

Parameters:

  • event (String)

    routing key to be used

  • message (Hash)

    the message to be publish

  • props (Hash) (defaults to: {})

    other properties you wish to publish with the message, such as custom headers etc.



45
46
47
48
49
50
51
52
# File 'lib/basquiat/adapters/rabbitmq_adapter.rb', line 45

def publish(event, message, props: {})
  if options[:publisher][:session_pool]
    session_pool.with { |session| session.publish(event, message, props) }
  else
    session.publish(event, message, props)
  end
  disconnect unless options[:publisher][:persistent]
end

#reset_connectionObject Also known as: disconnect

Reset the connection to RabbitMQ.



72
73
74
75
76
77
78
# File 'lib/basquiat/adapters/rabbitmq_adapter.rb', line 72

def reset_connection
  connection.disconnect
  @connection   = nil
  @session      = nil
  @session_pool = nil
  @strategy     = nil
end

#sessionSession

Lazy initializes and return the session

Returns:



90
91
92
# File 'lib/basquiat/adapters/rabbitmq_adapter.rb', line 90

def session
  @session ||= Session.new(connection.create_channel, @configuration.session_options)
end

#session_poolConnectionPool<Session>

Lazy initializes and return the session pool

Returns:



96
97
98
99
100
101
# File 'lib/basquiat/adapters/rabbitmq_adapter.rb', line 96

def session_pool
  @session_pool ||= ConnectionPool.new(size: options[:publisher][:session_pool].fetch(:size, 1),
                                       timeout: options[:publisher][:session_pool].fetch(:timeout, 5)) do
    Session.new(connection.create_channel, @configuration.session_options)
  end
end

#strategyBaseStrategy

Lazy initializes the requeue strategy configured for the adapter

Returns:



84
85
86
# File 'lib/basquiat/adapters/rabbitmq_adapter.rb', line 84

def strategy
  @strategy ||= @configuration.strategy.new(session)
end

#subscribe_to(event_name, proc) ⇒ Object

Adds the subscription and register the proc to the event.

Parameters:

  • event_name (String)

    routing key to be matched (and bound to) when listening

  • proc (#call)

    callable object to be run when a message with the said routing_key is received



37
38
39
# File 'lib/basquiat/adapters/rabbitmq_adapter.rb', line 37

def subscribe_to(event_name, proc)
  procs[event_name] = proc
end