Class: CfMessageBus::MessageBus
- Inherits:
-
Object
- Object
- CfMessageBus::MessageBus
- Defined in:
- lib/cf_message_bus/message_bus.rb
Instance Method Summary collapse
- #connected? ⇒ Boolean
-
#initialize(config) ⇒ MessageBus
constructor
A new instance of MessageBus.
- #publish(subject, message = nil, inbox = nil, &callback) ⇒ Object
- #request(subject, data = nil, options = {}, &block) ⇒ Object
- #subscribe(subject, options = {}, &block) ⇒ Object
- #synchronous_request(subject, data = nil, options = {}) ⇒ Object
- #unsubscribe(subscription_id) ⇒ Object
Constructor Details
#initialize(config) ⇒ MessageBus
Returns a new instance of MessageBus.
9 10 11 12 13 14 15 16 |
# File 'lib/cf_message_bus/message_bus.rb', line 9 def initialize(config) @logger = config[:logger] @internal_bus = MessageBusFactory.( config[:servers] || config[:uris] || config[:uri]) @subscriptions = {} end |
Instance Method Details
#connected? ⇒ Boolean
78 79 80 |
# File 'lib/cf_message_bus/message_bus.rb', line 78 def connected? internal_bus.connected? end |
#publish(subject, message = nil, inbox = nil, &callback) ⇒ Object
28 29 30 31 32 |
# File 'lib/cf_message_bus/message_bus.rb', line 28 def publish(subject, = nil, inbox=nil, &callback) EM.schedule do internal_bus.publish(subject, encode(), inbox, &callback) end end |
#request(subject, data = nil, options = {}, &block) ⇒ Object
34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 |
# File 'lib/cf_message_bus/message_bus.rb', line 34 def request(subject, data = nil, = {}, &block) response_timeout = .delete(:timeout) result_count = .delete(:result_count) [:max] = result_count if result_count subscription_id = internal_bus.request(subject, encode(data), ) do |payload, inbox| (payload, inbox) do |parsed_data, inbox| run_handler(block, parsed_data, inbox, subject, 'response') end end if response_timeout internal_bus.timeout(subscription_id, response_timeout, expected: result_count || 1) do run_handler(block, {timeout: true}, nil, subject, 'timeout') end end subscription_id end |
#subscribe(subject, options = {}, &block) ⇒ Object
18 19 20 21 22 23 24 25 26 |
# File 'lib/cf_message_bus/message_bus.rb', line 18 def subscribe(subject, = {}, &block) @subscriptions[subject] = [, block] subscribe_on_reactor(subject, ) do |parsed_data, inbox| EM.defer do run_handler(block, parsed_data, inbox, subject, 'subscription') end end end |
#synchronous_request(subject, data = nil, options = {}) ⇒ Object
54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 |
# File 'lib/cf_message_bus/message_bus.rb', line 54 def synchronous_request(subject, data = nil, = {}) [:result_count] ||= 1 result_count = [:result_count] return [] if result_count <= 0 EM.schedule_sync do |promise| results = [] request(subject, encode(data), ) do |response| if response[:timeout] promise.deliver(results) else results << response promise.deliver(results) if results.size == result_count end end end end |
#unsubscribe(subscription_id) ⇒ Object
74 75 76 |
# File 'lib/cf_message_bus/message_bus.rb', line 74 def unsubscribe(subscription_id) internal_bus.unsubscribe(subscription_id) end |