Class: CfMessageBus::MessageBus

Inherits:
Object
  • Object
show all
Defined in:
lib/cf_message_bus/message_bus.rb

Instance Method Summary collapse

Constructor Details

#initialize(config) ⇒ MessageBus

Returns a new instance of MessageBus.



9
10
11
12
13
14
15
# File 'lib/cf_message_bus/message_bus.rb', line 9

def initialize(config)
  @logger = config[:logger]
  @internal_bus = MessageBusFactory.message_bus(config[:uri])
  @subscriptions = {}
  @internal_bus.on_reconnect { start_internal_bus_recovery }
  @recovery_callback = lambda {}
end

Instance Method Details

#connected?Boolean

Returns:

  • (Boolean)


80
81
82
# File 'lib/cf_message_bus/message_bus.rb', line 80

def connected?
  internal_bus.connected?
end

#publish(subject, message = nil, &callback) ⇒ Object



27
28
29
30
31
# File 'lib/cf_message_bus/message_bus.rb', line 27

def publish(subject, message = nil, &callback)
  EM.schedule do
    internal_bus.publish(subject, encode(message), &callback)
  end
end

#recover(&block) ⇒ Object



33
34
35
# File 'lib/cf_message_bus/message_bus.rb', line 33

def recover(&block)
  @recovery_callback = block
end

#request(subject, data = nil, options = {}, &block) ⇒ Object



37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
# File 'lib/cf_message_bus/message_bus.rb', line 37

def request(subject, data = nil, options = {}, &block)
  response_timeout = options.delete(:timeout)
  result_count = options.delete(:result_count)
  options[:max] = result_count if result_count

  subscription_id = internal_bus.request(subject, encode(data), options) do |payload, inbox|
    process_message(payload, inbox) do |parsed_data, inbox|
      run_handler(block, parsed_data, inbox, subject, 'response')
    end
  end

  if response_timeout
    internal_bus.timeout(subscription_id, response_timeout, expected: result_count || 1) do
      run_handler(block, {timeout: true}, nil, subject, 'timeout')
    end
  end
  subscription_id
end

#subscribe(subject, options = {}, &block) ⇒ Object



17
18
19
20
21
22
23
24
25
# File 'lib/cf_message_bus/message_bus.rb', line 17

def subscribe(subject, options = {}, &block)
  @subscriptions[subject] = [options, block]

  subscribe_on_reactor(subject, options) do |parsed_data, inbox|
    EM.defer do
      run_handler(block, parsed_data, inbox, subject, 'subscription')
    end
  end
end

#synchronous_request(subject, data = nil, options = {}) ⇒ Object



56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
# File 'lib/cf_message_bus/message_bus.rb', line 56

def synchronous_request(subject, data = nil, options = {})
  options[:result_count] ||= 1
  result_count = options[:result_count]

  return [] if result_count <= 0

  EM.schedule_sync do |promise|
    results = []

    request(subject, encode(data), options) do |response|
      if response[:timeout]
        promise.deliver(results)
      else
        results << response
        promise.deliver(results) if results.size == result_count
      end
    end
  end
end

#unsubscribe(subscription_id) ⇒ Object



76
77
78
# File 'lib/cf_message_bus/message_bus.rb', line 76

def unsubscribe(subscription_id)
  internal_bus.unsubscribe(subscription_id)
end