Class: Combi::Queue

Inherits:
Bus
  • Object
show all
Defined in:
lib/combi/buses/queue.rb

Constant Summary

Constants inherited from Bus

Bus::RPC_DEFAULT_TIMEOUT, Bus::RPC_MAX_POLLS

Instance Attribute Summary collapse

Attributes inherited from Bus

#services

Instance Method Summary collapse

Methods inherited from Bus

#add_service, #enable, #log, #post_initialize, #restart!

Constructor Details

#initialize(options) ⇒ Queue

Returns a new instance of Queue.



9
10
11
12
13
14
15
16
# File 'lib/combi/buses/queue.rb', line 9

def initialize(options)
  super
  @response_store = Combi::ResponseStore.new
  @queue_service = Combi::QueueService.new(options[:amqp_config], rpc: :enabled)
  queue_service.rpc_callback = lambda do |message|
    @response_store.handle_rpc_response(message)
  end
end

Instance Attribute Details

#queue_serviceObject (readonly)

Returns the value of attribute queue_service.



7
8
9
# File 'lib/combi/buses/queue.rb', line 7

def queue_service
  @queue_service
end

Instance Method Details

#request(name, kind, message, options = {}) ⇒ Object



48
49
50
51
52
53
54
55
56
57
58
59
# File 'lib/combi/buses/queue.rb', line 48

def request(name, kind, message, options = {})
  options[:timeout] ||= RPC_DEFAULT_TIMEOUT
  options[:routing_key] = name.to_s
  correlation_id = rand(10_000_000).to_s
  options[:correlation_id] = correlation_id
  waiter = EventedWaiter.wait_for(correlation_id, @response_store, options[:timeout])
  queue_service.ready do
    log "Making request: #{name}.#{kind} #{message.inspect}\t|| #{options.inspect}"
    queue_service.call(kind, message, options)
  end
  waiter
end

#respond(service_instance, request, delivery_info) ⇒ Object



61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
# File 'lib/combi/buses/queue.rb', line 61

def respond(service_instance, request, delivery_info)
  message = JSON.parse request
  kind = message['kind']
  payload = message['payload']
  options = message['options']
  unless service_instance.respond_to?(kind)
    log "Service instance does not respond to #{kind}: #{service_instance.inspect}"
    return
  end
  log "generating response for #{service_instance.class}#{service_instance.actions.inspect}.#{kind} #{payload.inspect}"
  response = service_instance.send(kind, payload)

  if response.respond_to? :succeed
    log "response is deferred"
    response.callback do |service_response|
      log "responding with deferred answer: #{service_response.inspect}"
      queue_service.respond(service_response, delivery_info)
    end
  else
    log "responding with inmediate answer: #{response.inspect}"
    queue_service.respond(response, delivery_info) unless response.nil?
  end
end

#respond_to(service_instance, handler, options = {}) ⇒ Object



28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
# File 'lib/combi/buses/queue.rb', line 28

def respond_to(service_instance, handler, options = {})
  log "registering #{handler}"
  queue_options = {}
  subscription_options = {}
  if options[:fast] == true
    queue_options[:auto_delete] = false
  else
    subscription_options[:ack] = true
  end
  queue_service.ready do
    queue_service.queue(handler.to_s, queue_options) do |queue|
      log "subscribing to queue #{handler.to_s} with options #{queue_options}"
      queue.subscribe(subscription_options) do |delivery_info, payload|
        respond service_instance, payload, delivery_info
        queue_service.acknowledge delivery_info unless options[:fast] == true
      end
    end
  end
end

#start!Object



18
19
20
# File 'lib/combi/buses/queue.rb', line 18

def start!
  queue_service.start
end

#stop!Object



22
23
24
25
26
# File 'lib/combi/buses/queue.rb', line 22

def stop!
  queue_service.ready do
    @queue_service.disconnect
  end
end