Class: Dakwak::Communicator
- Inherits:
-
Object
- Object
- Dakwak::Communicator
- Defined in:
- lib/dakwak/messaging/communicator.rb
Overview
Communicator instances can send and receive messages across the platform.
Instance Method Summary collapse
- #attach_message_handler(callback) ⇒ Object
-
#on_message_received(msg) ⇒ Object
Handler called whenever a message is received over one of the subscribed queues.
- #private_channel ⇒ Object
-
#publish(msg, channel, queue = nil, &callback) ⇒ Object
Sends a message to a queue over a channel.
- #publish_and_expect_reply(payload, channel, queue = nil, &callback) ⇒ Object
- #reply_to(original_message, payload = "") ⇒ Object
-
#subscribe(channel, queues, exchange_opts = {}, queue_opts = {}, &callback) ⇒ Object
Subscribes this communicator to a number of queues in a channel.
- #subscribe_exclusively(opts = {}, &callback) ⇒ Object
-
#unsubscribe(channel) ⇒ Object
Removes the subscription to all queues in a channel.
Instance Method Details
#attach_message_handler(callback) ⇒ Object
95 96 97 98 |
# File 'lib/dakwak/messaging/communicator.rb', line 95 def (callback) @callbacks ||= [] @callbacks << callback end |
#on_message_received(msg) ⇒ Object
Handler called whenever a message is received over one of the subscribed queues.
Override this in your implementation.
91 92 93 |
# File 'lib/dakwak/messaging/communicator.rb', line 91 def (msg) @callbacks.each { |c| c.call(msg) } end |
#private_channel ⇒ Object
49 50 51 |
# File 'lib/dakwak/messaging/communicator.rb', line 49 def private_channel @private_channel end |
#publish(msg, channel, queue = nil, &callback) ⇒ Object
Sends a message to a queue over a channel.
If the queue isn’t specified, it is taken from the message. This is used when acknowledging requests or sending responses. See Job::ack() for more info.
71 72 73 74 75 76 77 |
# File 'lib/dakwak/messaging/communicator.rb', line 71 def publish(msg, channel, queue = nil, &callback) queue ||= msg.queue Station.instance.get_channel(channel) { |c| c.publish(msg, queue, &callback) } end |
#publish_and_expect_reply(payload, channel, queue = nil, &callback) ⇒ Object
79 80 81 82 83 84 85 |
# File 'lib/dakwak/messaging/communicator.rb', line 79 def publish_and_expect_reply(payload, channel, queue = nil, &callback) m = Message.new(payload) m.[:message_id] ||= BSON::ObjectId.new.to_s m.[:headers]["queue_name"] = private_channel.name publish(m, channel, queue, &callback) end |
#reply_to(original_message, payload = "") ⇒ Object
53 54 55 |
# File 'lib/dakwak/messaging/communicator.rb', line 53 def reply_to(, payload = "") publish(.prepare_reply(payload), "") end |
#subscribe(channel, queues, exchange_opts = {}, queue_opts = {}, &callback) ⇒ Object
Subscribes this communicator to a number of queues in a channel. Messages routed through the indicated queues will be passed on to this instance.
Params:
- channel
-
The name of the channel to subscribe to
- queues
-
An array of strings containing names of the queues to consume
Warning: Chaining calls to subscribe will only work within blocks as this method is asynchronous; when it returns, it doesn’t mean the subscription is active, only when the block is called.
20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 |
# File 'lib/dakwak/messaging/communicator.rb', line 20 def subscribe(channel, queues, exchange_opts = {}, queue_opts = {}, &callback) return if !channel or channel.empty? or !queues queues = [queues] if queues.is_a? String if queues.empty? callback.call if callback return end Proc.new { |channel, queue, &block| Station.instance.open_channel(channel, exchange_opts) { |c| c.subscribe(self, queue, queue_opts) block.call(c) if block } }.call(channel, queues.pop) { |c| subscribe(channel, queues) { callback.call if callback } } end |
#subscribe_exclusively(opts = {}, &callback) ⇒ Object
38 39 40 41 42 43 44 45 46 47 |
# File 'lib/dakwak/messaging/communicator.rb', line 38 def subscribe_exclusively(opts = {}, &callback) return @private_channel if @private_channel @private_channel = PrivateChannel.new @private_channel.open do @private_channel.consume(self, opts, &callback) end return @private_channel end |