Class: Dakwak::Channel

Inherits:
Object
  • Object
show all
Includes:
Logger
Defined in:
lib/dakwak/messaging/channel.rb

Overview

Channels bind a number of queues which are consumed for messages sent by communicators.

Dakwak::Communicator instances can subscribe to channel queues to be notified of messages received, and can publish messages through channels directly.

Note: Channels should not be managed directly, see Dakwak::Station for more info.

Note: A channel is internally bound to an AMQP exchange entity.

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from Logger

included, #log, #log_indent, #log_indent_dec, #log_indent_inc, #logging_context

Methods included from Silencable

#silence, #silent?

Constructor Details

#initialize(name) ⇒ Channel

Creates a new channel with name as the AMQP exchange name. Do not create channel objects directly, use Dakwak::Station instead.



187
188
189
190
191
192
193
# File 'lib/dakwak/messaging/channel.rb', line 187

def initialize(name)
  @__name = name # the name of the exchange this channel represents
  logging_context("Channel[#{@__name}]")

  @queues = {}
  super()
end

Instance Attribute Details

#channelObject (readonly)

Returns the value of attribute channel.



18
19
20
# File 'lib/dakwak/messaging/channel.rb', line 18

def channel
  @channel
end

#exchangeObject (readonly)

Returns the value of attribute exchange.



18
19
20
# File 'lib/dakwak/messaging/channel.rb', line 18

def exchange
  @exchange
end

#sessionObject (readonly)

Returns the value of attribute session.



18
19
20
# File 'lib/dakwak/messaging/channel.rb', line 18

def session
  @session
end

Instance Method Details

#close(&on_close) ⇒ Object

Terminates the connection with the AMQP broker and destroys all queue consumers.

Attempting to close an already closed channel will log a warning and reject the request. However, if a block was passed, it will still be called.

Note: This is an asynchronous method.



66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
# File 'lib/dakwak/messaging/channel.rb', line 66

def close(&on_close)
  unless opened?
    log_warn "Attempting to close a closed channel."
    on_close.call(self) if on_close
    return nil
  end

  EventMachine.next_tick do
    @session.close {
      @queues = []
      @session = nil

      on_close.call(self) if on_close
    }
  end

  nil
end

#consume(key, opts = {}) ⇒ Object

Binds the specified queue and consumes any messages routed to it.

Note: There’s little point in consuming a queue with no subscribed Dakwak::Communicator instances to it. To pull messages from a queue, use Communicator::subscribe.

Params:

routing_key

The AMQP routing key to use when binding the new queue to the exchange.

  • :exclusive The queue consumption will be exclusive to this consumer.

  • :nowait Does not wait for the broker to reply. If a binding error occurs, a Channel exception will be thrown.



151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
# File 'lib/dakwak/messaging/channel.rb', line 151

def consume(key, opts = {})
  return if consuming?(key)


  queue_name = "#{key}_queue"
  if opts.has_key?(:queue_name) then
    queue_name = opts[:queue_name]
    opts.delete(:queue_name)
  end
  log_info "Consuming '#{key}##{queue_name}'"

  # assign some defaults
  opts = {
    :exclusive => false,
    :nowait => true,
    :durable => true,
    :passive => true
  }.merge(opts)

  @queues[key]  = { :queue => nil, :subscribers => [], :consumer => nil }
  entry         = @queues[key]
  entry[:queue] = @channel.queue(queue_name, opts)
  # entry[:queue].bind(@exchange, { :routing_key => key })
  entry[:queue].subscribe { |meta, payload|
    msg = Message.new(payload, meta, key)
    dispatch(msg)
  }
end

#consuming?(queue) ⇒ Boolean

Is this queue being consumed?

Returns:

  • (Boolean)


181
182
183
# File 'lib/dakwak/messaging/channel.rb', line 181

def consuming?(queue)
  return @queues.has_key?(queue)
end

#nameObject



20
21
22
# File 'lib/dakwak/messaging/channel.rb', line 20

def name()
  @__name
end

#open(opts = { }, &on_open) ⇒ Object

Initiates connection with the AMQP broker using connection info stored in the Dakwak::Station and binds to a direct exchange identified by the name of this channel.

Params:

&on_open

A proc that will be called if and when the connection with the broker succeeds and the exchange is bound.

Note: This is an asynchronous method.



39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
# File 'lib/dakwak/messaging/channel.rb', line 39

def open(opts = { }, &on_open)
  if opened?
    on_open.call(self) if on_open
    return
  end

  opts = { :passive => true, :durable => true }.merge(opts)

  AMQP.connect(Station.instance.connection_options) do |session|
    @session = session
    AMQP::Channel.new(@session, :auto_recovery => true) do |channel|
      @channel  = channel
      @exchange = @channel.direct(@__name, opts)
      on_open.call(self) if on_open
    end
  end
end

#opened?Boolean

Returns:

  • (Boolean)


24
25
26
# File 'lib/dakwak/messaging/channel.rb', line 24

def opened?
  @session && @session.connected?
end

#publish(msg, queue, &callback) ⇒ Object

Sends a message over this channel to the destination queue.

Note: Messages published over a queue that is being consumed by this application will be IGNORED. The effect is analogous to AMQP’s no-local queue binding option. What this means to you is that you won’t have to worry about receiving self-messages.

Note: Messages are automatically stamped with the current application’s id and set in the message.meta.app_id so there’s no need to do it manually.



97
98
99
100
101
102
103
104
105
106
107
# File 'lib/dakwak/messaging/channel.rb', line 97

def publish(msg, queue, &callback)
  if not opened? then
    return open { |c| c.publish(msg, queue) }
  end

  @exchange.publish(msg.payload, msg.meta.merge!({
    :routing_key => queue,
    :app_id => Dakwak.app_fqn(),
    :timestamp => Time.now.to_i
  })) { callback.call(msg) if callback }
end

#subscribe(comm, queue, opts = {}) ⇒ Object

Subscribes a Communicator instance to messages received by a queue. The queue will automatically be consumed if it had not explicitly been marked for consumption.

Remark: The subscriber doesn’t necessarily have to be an instance of a Communicator, only that it responds to a method with the following signature: on_message_received(msg)



117
118
119
120
# File 'lib/dakwak/messaging/channel.rb', line 117

def subscribe(comm, queue, opts = {})
  consume(queue, opts)
  @queues[queue][:subscribers] << comm
end

#unsubscribe(comm, queue) ⇒ Object

Messages received by this queue will no longer be dispatched to the Communicator.



124
125
126
127
128
# File 'lib/dakwak/messaging/channel.rb', line 124

def unsubscribe(comm, queue)
  return unless consuming?(queue)

  @queues[queue][:subscribers].delete_if { |v| v == comm }
end

#unsubscribe_all(comm) ⇒ Object

Removes all queue subscriptions for this Communicator. Messages over this channel will no longer be dispatched to it.



132
133
134
# File 'lib/dakwak/messaging/channel.rb', line 132

def unsubscribe_all(comm)
  @queues.each { |q| unsubscribe(comm, q) }
end