Class: CfMessageBus::MessageBus
- Inherits:
-
Object
- Object
- CfMessageBus::MessageBus
- Defined in:
- lib/cf_message_bus/message_bus.rb
Instance Method Summary collapse
-
#initialize(config) ⇒ MessageBus
constructor
A new instance of MessageBus.
- #publish(subject, message = nil, &callback) ⇒ Object
- #recover(&block) ⇒ Object
- #request(subject, data = nil, options = {}, &block) ⇒ Object
- #subscribe(subject, opts = {}, &block) ⇒ Object
- #synchronous_request(subject, data = nil, opts = {}) ⇒ Object
- #unsubscribe(subscription_id) ⇒ Object
Constructor Details
#initialize(config) ⇒ MessageBus
Returns a new instance of MessageBus.
9 10 11 12 13 14 15 |
# File 'lib/cf_message_bus/message_bus.rb', line 9 def initialize(config) @logger = config[:logger] @internal_bus = MessageBusFactory.(config[:uri]) @subscriptions = {} @internal_bus.on_reconnect { start_internal_bus_recovery } @recovery_callback = lambda {} end |
Instance Method Details
#publish(subject, message = nil, &callback) ⇒ Object
27 28 29 30 31 |
# File 'lib/cf_message_bus/message_bus.rb', line 27 def publish(subject, = nil, &callback) EM.schedule do internal_bus.publish(subject, encode(), &callback) end end |
#recover(&block) ⇒ Object
33 34 35 |
# File 'lib/cf_message_bus/message_bus.rb', line 33 def recover(&block) @recovery_callback = block end |
#request(subject, data = nil, options = {}, &block) ⇒ Object
37 38 39 40 41 42 43 |
# File 'lib/cf_message_bus/message_bus.rb', line 37 def request(subject, data = nil, = {}, &block) internal_bus.request(subject, encode(data), ) do |payload, inbox| (payload, inbox) do |parsed_data, inbox| run_handler(block, parsed_data, inbox, subject, 'response') end end end |
#subscribe(subject, opts = {}, &block) ⇒ Object
17 18 19 20 21 22 23 24 25 |
# File 'lib/cf_message_bus/message_bus.rb', line 17 def subscribe(subject, opts = {}, &block) @subscriptions[subject] = [opts, block] subscribe_on_reactor(subject, opts) do |parsed_data, inbox| EM.defer do run_handler(block, parsed_data, inbox, subject, 'subscription') end end end |
#synchronous_request(subject, data = nil, opts = {}) ⇒ Object
45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 |
# File 'lib/cf_message_bus/message_bus.rb', line 45 def synchronous_request(subject, data = nil, opts = {}) result_count = opts[:result_count] || 1 timeout = opts[:timeout] || -1 return [] if result_count <= 0 response = EM.schedule_sync do |promise| results = [] sid = request(subject, encode(data), max: result_count) do |data| results << data promise.deliver(results) if results.size == result_count end if timeout >= 0 internal_bus.timeout(sid, timeout, expected: result_count) do promise.deliver(results) end end end response end |
#unsubscribe(subscription_id) ⇒ Object
69 70 71 |
# File 'lib/cf_message_bus/message_bus.rb', line 69 def unsubscribe(subscription_id) internal_bus.unsubscribe(subscription_id) end |