Class: CfMessageBus::MessageBus

Inherits:
Object
  • Object
show all
Defined in:
lib/cf_message_bus/message_bus.rb

Instance Method Summary collapse

Constructor Details

#initialize(config) ⇒ MessageBus

Returns a new instance of MessageBus.



9
10
11
12
13
14
15
16
# File 'lib/cf_message_bus/message_bus.rb', line 9

def initialize(config)
  @logger = config[:logger]

  @internal_bus = MessageBusFactory.message_bus(
    config[:servers] || config[:uris] || config[:uri])

  @subscriptions = {}
end

Instance Method Details

#connected?Boolean

Returns:

  • (Boolean)


78
79
80
# File 'lib/cf_message_bus/message_bus.rb', line 78

def connected?
  internal_bus.connected?
end

#publish(subject, message = nil, inbox = nil, &callback) ⇒ Object



28
29
30
31
32
# File 'lib/cf_message_bus/message_bus.rb', line 28

def publish(subject, message = nil, inbox=nil, &callback)
  EM.schedule do
    internal_bus.publish(subject, encode(message), inbox, &callback)
  end
end

#request(subject, data = nil, options = {}, &block) ⇒ Object



34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
# File 'lib/cf_message_bus/message_bus.rb', line 34

def request(subject, data = nil, options = {}, &block)
  response_timeout = options.delete(:timeout)
  result_count = options.delete(:result_count)
  options[:max] = result_count if result_count

  subscription_id = internal_bus.request(subject, encode(data), options) do |payload, inbox|
    process_message(payload, inbox) do |parsed_data, inbox|
      run_handler(block, parsed_data, inbox, subject, 'response')
    end
  end

  if response_timeout
    internal_bus.timeout(subscription_id, response_timeout, expected: result_count || 1) do
      run_handler(block, {timeout: true}, nil, subject, 'timeout')
    end
  end

  subscription_id
end

#subscribe(subject, options = {}, &block) ⇒ Object



18
19
20
21
22
23
24
25
26
# File 'lib/cf_message_bus/message_bus.rb', line 18

def subscribe(subject, options = {}, &block)
  @subscriptions[subject] = [options, block]

  subscribe_on_reactor(subject, options) do |parsed_data, inbox|
    EM.defer do
      run_handler(block, parsed_data, inbox, subject, 'subscription')
    end
  end
end

#synchronous_request(subject, data = nil, options = {}) ⇒ Object



54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
# File 'lib/cf_message_bus/message_bus.rb', line 54

def synchronous_request(subject, data = nil, options = {})
  options[:result_count] ||= 1
  result_count = options[:result_count]

  return [] if result_count <= 0

  EM.schedule_sync do |promise|
    results = []

    request(subject, encode(data), options) do |response|
      if response[:timeout]
        promise.deliver(results)
      else
        results << response
        promise.deliver(results) if results.size == result_count
      end
    end
  end
end

#unsubscribe(subscription_id) ⇒ Object



74
75
76
# File 'lib/cf_message_bus/message_bus.rb', line 74

def unsubscribe(subscription_id)
  internal_bus.unsubscribe(subscription_id)
end