Class: CfMessageBus::MessageBus
- Inherits:
-
Object
- Object
- CfMessageBus::MessageBus
- Defined in:
- lib/cf_message_bus/message_bus.rb
Instance Method Summary collapse
- #connected? ⇒ Boolean
-
#initialize(config) ⇒ MessageBus
constructor
A new instance of MessageBus.
- #publish(subject, message = nil, &callback) ⇒ Object
- #recover(&block) ⇒ Object
- #request(subject, data = nil, options = {}, &block) ⇒ Object
- #subscribe(subject, options = {}, &block) ⇒ Object
- #synchronous_request(subject, data = nil, options = {}) ⇒ Object
- #unsubscribe(subscription_id) ⇒ Object
Constructor Details
#initialize(config) ⇒ MessageBus
Returns a new instance of MessageBus.
9 10 11 12 13 14 15 |
# File 'lib/cf_message_bus/message_bus.rb', line 9 def initialize(config) @logger = config[:logger] @internal_bus = MessageBusFactory.(config[:uri]) @subscriptions = {} @internal_bus.on_reconnect { start_internal_bus_recovery } @recovery_callback = lambda {} end |
Instance Method Details
#connected? ⇒ Boolean
80 81 82 |
# File 'lib/cf_message_bus/message_bus.rb', line 80 def connected? internal_bus.connected? end |
#publish(subject, message = nil, &callback) ⇒ Object
27 28 29 30 31 |
# File 'lib/cf_message_bus/message_bus.rb', line 27 def publish(subject, = nil, &callback) EM.schedule do internal_bus.publish(subject, encode(), &callback) end end |
#recover(&block) ⇒ Object
33 34 35 |
# File 'lib/cf_message_bus/message_bus.rb', line 33 def recover(&block) @recovery_callback = block end |
#request(subject, data = nil, options = {}, &block) ⇒ Object
37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 |
# File 'lib/cf_message_bus/message_bus.rb', line 37 def request(subject, data = nil, = {}, &block) response_timeout = .delete(:timeout) result_count = .delete(:result_count) [:max] = result_count if result_count subscription_id = internal_bus.request(subject, encode(data), ) do |payload, inbox| (payload, inbox) do |parsed_data, inbox| run_handler(block, parsed_data, inbox, subject, 'response') end end if response_timeout internal_bus.timeout(subscription_id, response_timeout, expected: result_count || 1) do run_handler(block, {timeout: true}, nil, subject, 'timeout') end end subscription_id end |
#subscribe(subject, options = {}, &block) ⇒ Object
17 18 19 20 21 22 23 24 25 |
# File 'lib/cf_message_bus/message_bus.rb', line 17 def subscribe(subject, = {}, &block) @subscriptions[subject] = [, block] subscribe_on_reactor(subject, ) do |parsed_data, inbox| EM.defer do run_handler(block, parsed_data, inbox, subject, 'subscription') end end end |
#synchronous_request(subject, data = nil, options = {}) ⇒ Object
56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 |
# File 'lib/cf_message_bus/message_bus.rb', line 56 def synchronous_request(subject, data = nil, = {}) [:result_count] ||= 1 result_count = [:result_count] return [] if result_count <= 0 EM.schedule_sync do |promise| results = [] request(subject, encode(data), ) do |response| if response[:timeout] promise.deliver(results) else results << response promise.deliver(results) if results.size == result_count end end end end |
#unsubscribe(subscription_id) ⇒ Object
76 77 78 |
# File 'lib/cf_message_bus/message_bus.rb', line 76 def unsubscribe(subscription_id) internal_bus.unsubscribe(subscription_id) end |