Class: Dakwak::Channel
- Inherits:
-
Object
- Object
- Dakwak::Channel
- Includes:
- Logger
- Defined in:
- lib/dakwak/messaging/channel.rb
Overview
Channels bind a number of queues which are consumed for messages sent by communicators.
Dakwak::Communicator instances can subscribe to channel queues to be notified of messages received, and can publish messages through channels directly.
Note: Channels should not be managed directly, see Dakwak::Station for more info.
Note: A channel is internally bound to an AMQP exchange entity.
Instance Attribute Summary collapse
-
#channel ⇒ Object
readonly
Returns the value of attribute channel.
-
#exchange ⇒ Object
readonly
Returns the value of attribute exchange.
-
#session ⇒ Object
readonly
Returns the value of attribute session.
Instance Method Summary collapse
-
#close(&on_close) ⇒ Object
Terminates the connection with the AMQP broker and destroys all queue consumers.
-
#consume(key, opts = {}) ⇒ Object
Binds the specified queue and consumes any messages routed to it.
-
#consuming?(queue) ⇒ Boolean
Is this queue being consumed?.
-
#initialize(name) ⇒ Channel
constructor
Creates a new channel with name as the AMQP exchange name.
- #name ⇒ Object
-
#open(opts = { }, &on_open) ⇒ Object
Initiates connection with the AMQP broker using connection info stored in the Dakwak::Station and binds to a direct exchange identified by the name of this channel.
- #opened? ⇒ Boolean
-
#publish(msg, queue, &callback) ⇒ Object
Sends a message over this channel to the destination queue.
-
#subscribe(comm, queue, opts = {}) ⇒ Object
Subscribes a Communicator instance to messages received by a queue.
-
#unsubscribe(comm, queue) ⇒ Object
Messages received by this queue will no longer be dispatched to the Communicator.
-
#unsubscribe_all(comm) ⇒ Object
Removes all queue subscriptions for this Communicator.
Methods included from Logger
included, #log, #log_indent, #log_indent_dec, #log_indent_inc, #logging_context
Methods included from Silencable
Constructor Details
#initialize(name) ⇒ Channel
Creates a new channel with name as the AMQP exchange name. Do not create channel objects directly, use Dakwak::Station instead.
187 188 189 190 191 192 193 |
# File 'lib/dakwak/messaging/channel.rb', line 187 def initialize(name) @__name = name # the name of the exchange this channel represents logging_context("Channel[#{@__name}]") @queues = {} super() end |
Instance Attribute Details
#channel ⇒ Object (readonly)
Returns the value of attribute channel.
18 19 20 |
# File 'lib/dakwak/messaging/channel.rb', line 18 def channel @channel end |
#exchange ⇒ Object (readonly)
Returns the value of attribute exchange.
18 19 20 |
# File 'lib/dakwak/messaging/channel.rb', line 18 def exchange @exchange end |
#session ⇒ Object (readonly)
Returns the value of attribute session.
18 19 20 |
# File 'lib/dakwak/messaging/channel.rb', line 18 def session @session end |
Instance Method Details
#close(&on_close) ⇒ Object
Terminates the connection with the AMQP broker and destroys all queue consumers.
Attempting to close an already closed channel will log a warning and reject the request. However, if a block was passed, it will still be called.
Note: This is an asynchronous method.
66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 |
# File 'lib/dakwak/messaging/channel.rb', line 66 def close(&on_close) unless opened? log_warn "Attempting to close a closed channel." on_close.call(self) if on_close return nil end EventMachine.next_tick do @session.close { @queues = [] @session = nil on_close.call(self) if on_close } end nil end |
#consume(key, opts = {}) ⇒ Object
Binds the specified queue and consumes any messages routed to it.
Note: There’s little point in consuming a queue with no subscribed Dakwak::Communicator instances to it. To pull messages from a queue, use Communicator::subscribe.
Params:
- routing_key
-
The AMQP routing key to use when binding the new queue to the exchange.
-
:exclusive The queue consumption will be exclusive to this consumer.
-
:nowait Does not wait for the broker to reply. If a binding error occurs, a Channel exception will be thrown.
151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 |
# File 'lib/dakwak/messaging/channel.rb', line 151 def consume(key, opts = {}) return if consuming?(key) queue_name = "#{key}_queue" if opts.has_key?(:queue_name) then queue_name = opts[:queue_name] opts.delete(:queue_name) end log_info "Consuming '#{key}##{queue_name}'" # assign some defaults opts = { :exclusive => false, :nowait => true, :durable => true, :passive => true }.merge(opts) @queues[key] = { :queue => nil, :subscribers => [], :consumer => nil } entry = @queues[key] entry[:queue] = @channel.queue(queue_name, opts) # entry[:queue].bind(@exchange, { :routing_key => key }) entry[:queue].subscribe { |, payload| msg = Message.new(payload, , key) dispatch(msg) } end |
#consuming?(queue) ⇒ Boolean
Is this queue being consumed?
181 182 183 |
# File 'lib/dakwak/messaging/channel.rb', line 181 def consuming?(queue) return @queues.has_key?(queue) end |
#name ⇒ Object
20 21 22 |
# File 'lib/dakwak/messaging/channel.rb', line 20 def name() @__name end |
#open(opts = { }, &on_open) ⇒ Object
Initiates connection with the AMQP broker using connection info stored in the Dakwak::Station and binds to a direct exchange identified by the name of this channel.
Params:
- &on_open
-
A proc that will be called if and when the connection with the broker succeeds and the exchange is bound.
Note: This is an asynchronous method.
39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 |
# File 'lib/dakwak/messaging/channel.rb', line 39 def open(opts = { }, &on_open) if opened? on_open.call(self) if on_open return end opts = { :passive => true, :durable => true }.merge(opts) AMQP.connect(Station.instance.) do |session| @session = session AMQP::Channel.new(@session, :auto_recovery => true) do |channel| @channel = channel @exchange = @channel.direct(@__name, opts) on_open.call(self) if on_open end end end |
#opened? ⇒ Boolean
24 25 26 |
# File 'lib/dakwak/messaging/channel.rb', line 24 def opened? @session && @session.connected? end |
#publish(msg, queue, &callback) ⇒ Object
Sends a message over this channel to the destination queue.
Note: Messages published over a queue that is being consumed by this application will be IGNORED. The effect is analogous to AMQP’s no-local queue binding option. What this means to you is that you won’t have to worry about receiving self-messages.
Note: Messages are automatically stamped with the current application’s id and set in the message.meta.app_id so there’s no need to do it manually.
97 98 99 100 101 102 103 104 105 106 107 |
# File 'lib/dakwak/messaging/channel.rb', line 97 def publish(msg, queue, &callback) if not opened? then return open { |c| c.publish(msg, queue) } end @exchange.publish(msg.payload, msg..merge!({ :routing_key => queue, :app_id => Dakwak.app_fqn(), :timestamp => Time.now.to_i })) { callback.call(msg) if callback } end |
#subscribe(comm, queue, opts = {}) ⇒ Object
Subscribes a Communicator instance to messages received by a queue. The queue will automatically be consumed if it had not explicitly been marked for consumption.
Remark: The subscriber doesn’t necessarily have to be an instance of a Communicator, only that it responds to a method with the following signature: on_message_received(msg)
117 118 119 120 |
# File 'lib/dakwak/messaging/channel.rb', line 117 def subscribe(comm, queue, opts = {}) consume(queue, opts) @queues[queue][:subscribers] << comm end |
#unsubscribe(comm, queue) ⇒ Object
Messages received by this queue will no longer be dispatched to the Communicator.
124 125 126 127 128 |
# File 'lib/dakwak/messaging/channel.rb', line 124 def unsubscribe(comm, queue) return unless consuming?(queue) @queues[queue][:subscribers].delete_if { |v| v == comm } end |
#unsubscribe_all(comm) ⇒ Object
Removes all queue subscriptions for this Communicator. Messages over this channel will no longer be dispatched to it.
132 133 134 |
# File 'lib/dakwak/messaging/channel.rb', line 132 def unsubscribe_all(comm) @queues.each { |q| unsubscribe(comm, q) } end |