Class: Dakwak::PrivateChannel
- Inherits:
-
Object
- Object
- Dakwak::PrivateChannel
- Includes:
- Logger
- Defined in:
- lib/dakwak/messaging/private_channel.rb
Overview
Channels bind a number of queues which are consumed for messages sent by communicators.
Dakwak::Communicator instances can subscribe to channel queues to be notified of messages received, and can publish messages through channels directly.
Note: Channels should not be managed directly, see Dakwak::Station for more info.
Note: A channel is internally bound to an AMQP exchange entity.
Instance Attribute Summary collapse
-
#channel ⇒ Object
readonly
Returns the value of attribute channel.
-
#exchange ⇒ Object
readonly
Returns the value of attribute exchange.
-
#session ⇒ Object
readonly
Returns the value of attribute session.
Instance Method Summary collapse
-
#close(&on_close) ⇒ Object
Terminates the connection with the AMQP broker and destroys all queue consumers.
-
#consume(comm, opts = {}) ⇒ Object
Binds the specified queue and consumes any messages routed to it.
-
#consuming? ⇒ Boolean
Is the queue being consumed?.
-
#initialize ⇒ PrivateChannel
constructor
Creates a new channel with name as the AMQP exchange name.
- #name ⇒ Object
-
#open(&on_open) ⇒ Object
Initiates connection with the AMQP broker using connection info stored in the Dakwak::Station and binds to a direct exchange identified by the name of this channel.
- #opened? ⇒ Boolean
Methods included from Logger
included, #log, #log_indent, #log_indent_dec, #log_indent_inc, #logging_context
Methods included from Silencable
Constructor Details
#initialize ⇒ PrivateChannel
Creates a new channel with name as the AMQP exchange name. Do not create channel objects directly, use Dakwak::Station instead.
134 135 136 137 138 139 |
# File 'lib/dakwak/messaging/private_channel.rb', line 134 def initialize() logging_context("PrivateChannel") @channel, @exchange, @queue, @comlink = nil, nil, nil, nil super() end |
Instance Attribute Details
#channel ⇒ Object (readonly)
Returns the value of attribute channel.
18 19 20 |
# File 'lib/dakwak/messaging/private_channel.rb', line 18 def channel @channel end |
#exchange ⇒ Object (readonly)
Returns the value of attribute exchange.
18 19 20 |
# File 'lib/dakwak/messaging/private_channel.rb', line 18 def exchange @exchange end |
#session ⇒ Object (readonly)
Returns the value of attribute session.
18 19 20 |
# File 'lib/dakwak/messaging/private_channel.rb', line 18 def session @session end |
Instance Method Details
#close(&on_close) ⇒ Object
Terminates the connection with the AMQP broker and destroys all queue consumers.
Attempting to close an already closed channel will log a warning and reject the request. However, if a block was passed, it will still be called.
Note: This is an asynchronous method.
60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 |
# File 'lib/dakwak/messaging/private_channel.rb', line 60 def close(&on_close) unless opened? log_warn "Attempting to close a closed channel." on_close.call(self) if on_close return nil end EventMachine.next_tick do @session.close { @queue = nil @session = nil on_close.call(self) if on_close } end nil end |
#consume(comm, opts = {}) ⇒ Object
Binds the specified queue and consumes any messages routed to it.
Note: There’s little point in consuming a queue with no subscribed Dakwak::Communicator instances to it. To pull messages from a queue, use Communicator::subscribe.
Params:
- routing_key
-
The AMQP routing key to use when binding the new queue to the exchange.
-
:exclusive The queue consumption will be exclusive to this consumer.
-
:nowait Does not wait for the broker to reply. If a binding error occurs, a Channel exception will be thrown.
98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 |
# File 'lib/dakwak/messaging/private_channel.rb', line 98 def consume(comm, opts = {}) return if consuming? @comlink = comm # assign some defaults opts = { exclusive: true, durable: false, passive: false, auto_delete: true }.merge(opts) @channel.queue("", opts) do |queue, declare_ok| # TODO: handle declaration failure @queue = queue yield if block_given? # log_info "Consuming #{@queue.name}" @queue.subscribe { |, payload| msg = Message.new(payload, ) # log_info "Dispatching message #{msg}" @comlink.(msg) } end end |
#consuming? ⇒ Boolean
Is the queue being consumed?
128 129 130 |
# File 'lib/dakwak/messaging/private_channel.rb', line 128 def consuming? return !@queue.nil? end |
#name ⇒ Object
79 80 81 |
# File 'lib/dakwak/messaging/private_channel.rb', line 79 def name() @queue.name end |
#open(&on_open) ⇒ Object
Initiates connection with the AMQP broker using connection info stored in the Dakwak::Station and binds to a direct exchange identified by the name of this channel.
Params:
- &on_open
-
A proc that will be called if and when the connection with the broker succeeds and the exchange is bound.
Note: This is an asynchronous method.
35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 |
# File 'lib/dakwak/messaging/private_channel.rb', line 35 def open(&on_open) if opened? on_open.call(self) if on_open return end AMQP.connect(Station.instance.) do |session| @session = session AMQP::Channel.new(@session, :auto_recovery => true) do |channel| @channel = channel @exchange = @channel.default_exchange on_open.call(self) if on_open end end end |
#opened? ⇒ Boolean
20 21 22 |
# File 'lib/dakwak/messaging/private_channel.rb', line 20 def opened? @session && @session.connected? end |