Class: Dakwak::PrivateChannel

Inherits:
Object
  • Object
show all
Includes:
Logger
Defined in:
lib/dakwak/messaging/private_channel.rb

Overview

Channels bind a number of queues which are consumed for messages sent by communicators.

Dakwak::Communicator instances can subscribe to channel queues to be notified of messages received, and can publish messages through channels directly.

Note: Channels should not be managed directly, see Dakwak::Station for more info.

Note: A channel is internally bound to an AMQP exchange entity.

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from Logger

included, #log, #log_indent, #log_indent_dec, #log_indent_inc, #logging_context

Methods included from Silencable

#silence, #silent?

Constructor Details

#initializePrivateChannel

Creates a new channel with name as the AMQP exchange name. Do not create channel objects directly, use Dakwak::Station instead.



134
135
136
137
138
139
# File 'lib/dakwak/messaging/private_channel.rb', line 134

def initialize()
  logging_context("PrivateChannel")

  @channel, @exchange, @queue, @comlink = nil, nil, nil, nil
  super()
end

Instance Attribute Details

#channelObject (readonly)

Returns the value of attribute channel.



18
19
20
# File 'lib/dakwak/messaging/private_channel.rb', line 18

def channel
  @channel
end

#exchangeObject (readonly)

Returns the value of attribute exchange.



18
19
20
# File 'lib/dakwak/messaging/private_channel.rb', line 18

def exchange
  @exchange
end

#sessionObject (readonly)

Returns the value of attribute session.



18
19
20
# File 'lib/dakwak/messaging/private_channel.rb', line 18

def session
  @session
end

Instance Method Details

#close(&on_close) ⇒ Object

Terminates the connection with the AMQP broker and destroys all queue consumers.

Attempting to close an already closed channel will log a warning and reject the request. However, if a block was passed, it will still be called.

Note: This is an asynchronous method.



60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
# File 'lib/dakwak/messaging/private_channel.rb', line 60

def close(&on_close)
  unless opened?
    log_warn "Attempting to close a closed channel."
    on_close.call(self) if on_close
    return nil
  end

  EventMachine.next_tick do
    @session.close {
      @queue = nil
      @session = nil

      on_close.call(self) if on_close
    }
  end

  nil
end

#consume(comm, opts = {}) ⇒ Object

Binds the specified queue and consumes any messages routed to it.

Note: There’s little point in consuming a queue with no subscribed Dakwak::Communicator instances to it. To pull messages from a queue, use Communicator::subscribe.

Params:

routing_key

The AMQP routing key to use when binding the new queue to the exchange.

  • :exclusive The queue consumption will be exclusive to this consumer.

  • :nowait Does not wait for the broker to reply. If a binding error occurs, a Channel exception will be thrown.



98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
# File 'lib/dakwak/messaging/private_channel.rb', line 98

def consume(comm, opts = {})
  return if consuming?

  @comlink = comm

  # assign some defaults
  opts = {
    exclusive: true,
    durable: false,
    passive: false,
    auto_delete: true
  }.merge(opts)

  @channel.queue("", opts) do |queue, declare_ok|
    # TODO: handle declaration failure

    @queue = queue

    yield if block_given?

    # log_info "Consuming #{@queue.name}"
    @queue.subscribe { |meta, payload|
      msg = Message.new(payload, meta)
      # log_info "Dispatching message #{msg}"
      @comlink.on_message_received(msg)
    }
  end
end

#consuming?Boolean

Is the queue being consumed?

Returns:

  • (Boolean)


128
129
130
# File 'lib/dakwak/messaging/private_channel.rb', line 128

def consuming?
  return !@queue.nil?
end

#nameObject



79
80
81
# File 'lib/dakwak/messaging/private_channel.rb', line 79

def name()
  @queue.name
end

#open(&on_open) ⇒ Object

Initiates connection with the AMQP broker using connection info stored in the Dakwak::Station and binds to a direct exchange identified by the name of this channel.

Params:

&on_open

A proc that will be called if and when the connection with the broker succeeds and the exchange is bound.

Note: This is an asynchronous method.



35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
# File 'lib/dakwak/messaging/private_channel.rb', line 35

def open(&on_open)
  if opened?
    on_open.call(self) if on_open
    return
  end

  AMQP.connect(Station.instance.connection_options) do |session|
    @session = session
    AMQP::Channel.new(@session, :auto_recovery => true) do |channel|
      @channel  = channel
      @exchange = @channel.default_exchange
      on_open.call(self) if on_open
    end
  end
end

#opened?Boolean

Returns:

  • (Boolean)


20
21
22
# File 'lib/dakwak/messaging/private_channel.rb', line 20

def opened?
  @session && @session.connected?
end