Class: Dakwak::Communicator

Inherits:
Object
  • Object
show all
Defined in:
lib/dakwak/messaging/communicator.rb

Overview

Communicator instances can send and receive messages across the platform.

Instance Method Summary collapse

Instance Method Details

#attach_message_handler(callback) ⇒ Object



95
96
97
98
# File 'lib/dakwak/messaging/communicator.rb', line 95

def attach_message_handler(callback)
  @callbacks ||= []
  @callbacks << callback
end

#on_message_received(msg) ⇒ Object

Handler called whenever a message is received over one of the subscribed queues.

Override this in your implementation.



91
92
93
# File 'lib/dakwak/messaging/communicator.rb', line 91

def on_message_received(msg)
  @callbacks.each { |c| c.call(msg) }
end

#private_channelObject



49
50
51
# File 'lib/dakwak/messaging/communicator.rb', line 49

def private_channel
  @private_channel
end

#publish(msg, channel, queue = nil, &callback) ⇒ Object

Sends a message to a queue over a channel.

If the queue isn’t specified, it is taken from the message. This is used when acknowledging requests or sending responses. See Job::ack() for more info.



71
72
73
74
75
76
77
# File 'lib/dakwak/messaging/communicator.rb', line 71

def publish(msg, channel, queue = nil, &callback)
  queue ||= msg.queue

  Station.instance.get_channel(channel) { |c|
    c.publish(msg, queue, &callback)
  }
end

#publish_and_expect_reply(payload, channel, queue = nil, &callback) ⇒ Object



79
80
81
82
83
84
85
# File 'lib/dakwak/messaging/communicator.rb', line 79

def publish_and_expect_reply(payload, channel, queue = nil, &callback)
  m = Message.new(payload)
  m.meta[:message_id] ||= BSON::ObjectId.new.to_s
  m.meta[:headers]["queue_name"] = private_channel.name

  publish(m, channel, queue, &callback)
end

#reply_to(original_message, payload = "") ⇒ Object



53
54
55
# File 'lib/dakwak/messaging/communicator.rb', line 53

def reply_to(original_message, payload = "")
  publish(original_message.prepare_reply(payload), "")
end

#subscribe(channel, queues, exchange_opts = {}, queue_opts = {}, &callback) ⇒ Object

Subscribes this communicator to a number of queues in a channel. Messages routed through the indicated queues will be passed on to this instance.

Params:

channel

The name of the channel to subscribe to

queues

An array of strings containing names of the queues to consume

Warning: Chaining calls to subscribe will only work within blocks as this method is asynchronous; when it returns, it doesn’t mean the subscription is active, only when the block is called.



20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
# File 'lib/dakwak/messaging/communicator.rb', line 20

def subscribe(channel, queues, exchange_opts = {}, queue_opts = {}, &callback)
  return if !channel or channel.empty? or !queues

  queues = [queues] if queues.is_a? String

  if queues.empty?
    callback.call if callback
    return
  end

  Proc.new { |channel, queue, &block|
    Station.instance.open_channel(channel, exchange_opts) { |c|
      c.subscribe(self, queue, queue_opts)
      block.call(c) if block
    }
  }.call(channel, queues.pop) { |c| subscribe(channel, queues) { callback.call if callback } }
end

#subscribe_exclusively(opts = {}, &callback) ⇒ Object



38
39
40
41
42
43
44
45
46
47
# File 'lib/dakwak/messaging/communicator.rb', line 38

def subscribe_exclusively(opts = {}, &callback)
  return @private_channel if @private_channel

  @private_channel = PrivateChannel.new
  @private_channel.open do
    @private_channel.consume(self, opts, &callback)
  end

  return @private_channel
end

#unsubscribe(channel) ⇒ Object

Removes the subscription to all queues in a channel.



58
59
60
# File 'lib/dakwak/messaging/communicator.rb', line 58

def unsubscribe(channel, queue)
  Station.instance.get_channel(channel) { |c| c.unsubscribe(self, queue) }
end