Class: CfMessageBus::MessageBus

Inherits:
Object
  • Object
show all
Defined in:
lib/cf_message_bus/message_bus.rb

Instance Method Summary collapse

Constructor Details

#initialize(config) ⇒ MessageBus

Returns a new instance of MessageBus.



9
10
11
12
13
14
15
# File 'lib/cf_message_bus/message_bus.rb', line 9

def initialize(config)
  @logger = config[:logger]
  @internal_bus = MessageBusFactory.message_bus(config[:uri])
  @subscriptions = {}
  @internal_bus.on_reconnect { start_internal_bus_recovery }
  @recovery_callback = lambda {}
end

Instance Method Details

#publish(subject, message = nil, &callback) ⇒ Object



27
28
29
30
31
# File 'lib/cf_message_bus/message_bus.rb', line 27

def publish(subject, message = nil, &callback)
  EM.schedule do
    internal_bus.publish(subject, encode(message), &callback)
  end
end

#recover(&block) ⇒ Object



33
34
35
# File 'lib/cf_message_bus/message_bus.rb', line 33

def recover(&block)
  @recovery_callback = block
end

#request(subject, data = nil, options = {}, &block) ⇒ Object



37
38
39
40
41
42
43
# File 'lib/cf_message_bus/message_bus.rb', line 37

def request(subject, data = nil, options = {}, &block)
  internal_bus.request(subject, encode(data), options) do |payload, inbox|
    process_message(payload, inbox) do |parsed_data, inbox|
      run_handler(block, parsed_data, inbox, subject, 'response')
    end
  end
end

#subscribe(subject, opts = {}, &block) ⇒ Object



17
18
19
20
21
22
23
24
25
# File 'lib/cf_message_bus/message_bus.rb', line 17

def subscribe(subject, opts = {}, &block)
  @subscriptions[subject] = [opts, block]

  subscribe_on_reactor(subject, opts) do |parsed_data, inbox|
    EM.defer do
      run_handler(block, parsed_data, inbox, subject, 'subscription')
    end
  end
end

#synchronous_request(subject, data = nil, opts = {}) ⇒ Object



45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
# File 'lib/cf_message_bus/message_bus.rb', line 45

def synchronous_request(subject, data = nil, opts = {})
  result_count = opts[:result_count] || 1
  timeout = opts[:timeout] || -1

  return [] if result_count <= 0

  response = EM.schedule_sync do |promise|
    results = []

    sid = request(subject, encode(data), max: result_count) do |data|
      results << data
      promise.deliver(results) if results.size == result_count
    end

    if timeout >= 0
      internal_bus.timeout(sid, timeout, expected: result_count) do
        promise.deliver(results)
      end
    end
  end

  response
end

#unsubscribe(subscription_id) ⇒ Object



69
70
71
# File 'lib/cf_message_bus/message_bus.rb', line 69

def unsubscribe(subscription_id)
  internal_bus.unsubscribe(subscription_id)
end