Module: PipelineToolkit::Amqp::Abstract
Overview
Instance Attribute Summary collapse
-
#options ⇒ Object
readonly
Getter for the options hash.
Instance Method Summary collapse
-
#bind_queue(routing_key = '') ⇒ Object
Binds the queue to an exchange.
- #initialize(options = {}) ⇒ Object
-
#initialize_channel ⇒ Object
Returns a new channel.
-
#initialize_connection ⇒ Object
Create a new connection to the AMQP server.
-
#initialize_exchange ⇒ Object
Defines, intializes and returns an Exchange to act as an ingress point for all published messages.
-
#initialize_queue(name = '') ⇒ Object
Defines, intializes and returns an Queue that store and forward messages.
-
#stop_connection ⇒ Object
Gracefully shuts down the AMQP connection.
Instance Attribute Details
#options ⇒ Object (readonly)
Getter for the options hash
14 15 16 |
# File 'lib/pipeline_toolkit/amqp/abstract.rb', line 14 def @options end |
Instance Method Details
#bind_queue(routing_key = '') ⇒ Object
Binds the queue to an exchange.
94 95 96 97 98 99 100 101 102 |
# File 'lib/pipeline_toolkit/amqp/abstract.rb', line 94 def bind_queue(routing_key = '') routing_key = [:queue].split(":").last if [:queue] && [:queue].include?(":") DefaultLogger.debug("Amqp::Abstract#bind_queue(routing_key = '')") unless routing_key.nil? || routing_key.empty? @queue.bind(@exchange, :key => routing_key) else @queue.bind(@exchange) end end |
#initialize(options = {}) ⇒ Object
32 33 34 35 36 |
# File 'lib/pipeline_toolkit/amqp/abstract.rb', line 32 def initialize( = {}) super() @options = end |
#initialize_channel ⇒ Object
Returns a new channel. A channel is a bidirectional virtual connection between the client and the AMQP server.
59 60 61 62 |
# File 'lib/pipeline_toolkit/amqp/abstract.rb', line 59 def initialize_channel DefaultLogger.debug("Amqp::Abstract#initialize_channel") @channel = MQ.new(@connection) end |
#initialize_connection ⇒ Object
Create a new connection to the AMQP server.
41 42 43 44 |
# File 'lib/pipeline_toolkit/amqp/abstract.rb', line 41 def initialize_connection DefaultLogger.debug("Amqp::Abstract#initialize_connection") @connection = AMQP.connect(.select_keys(:host, :port, :user, :pass, :vhost)) end |
#initialize_exchange ⇒ Object
Defines, intializes and returns an Exchange to act as an ingress point for all published messages.
68 69 70 71 72 73 74 75 |
# File 'lib/pipeline_toolkit/amqp/abstract.rb', line 68 def initialize_exchange DefaultLogger.debug("Amqp::Abstract#initialize_exchange") # declare a exchange on the channel @exchange = MQ::Exchange.new(@channel, [:type], [:exchange], :durable => [:durable], :passive => [:passive]) rescue MQ::Error => e # rescued here because main thread does not seem to see it DefaultLogger.error "#{e.class.name}: #{e.}\n" << e.backtrace.join("\n") raise e end |
#initialize_queue(name = '') ⇒ Object
Defines, intializes and returns an Queue that store and forward messages.
82 83 84 85 86 |
# File 'lib/pipeline_toolkit/amqp/abstract.rb', line 82 def initialize_queue(name = '') DefaultLogger.debug("Amqp::Abstract#initialize_queue(name = '')") name = [:queue] if name.nil? || name.empty? @queue = MQ::Queue.new(@channel, name, :durable => [:durable], :passive => [:passive]) end |
#stop_connection ⇒ Object
Gracefully shuts down the AMQP connection. Calls the given block if provided
49 50 51 52 53 |
# File 'lib/pipeline_toolkit/amqp/abstract.rb', line 49 def stop_connection # NB: Next tick seems to give it enough time to receieve and send outstanding acks. But I don't # really understand timing, so keep an eye on it. EM.next_tick { @connection.close { yield if block_given? } } end |