Class: Combi::Queue

Inherits:
Bus
  • Object
show all
Defined in:
lib/combi/buses/queue.rb

Constant Summary

Constants inherited from Bus

Bus::RPC_DEFAULT_TIMEOUT

Instance Attribute Summary collapse

Attributes inherited from Bus

#services

Instance Method Summary collapse

Methods inherited from Bus

#add_service, #enable, #log, #post_initialize, #restart!

Constructor Details

#initialize(options) ⇒ Queue

Returns a new instance of Queue.



9
10
11
12
13
14
15
16
# File 'lib/combi/buses/queue.rb', line 9

def initialize(options)
  super
  @response_store = Combi::ResponseStore.new
  @queue_service = Combi::QueueService.new(options[:amqp_config], rpc: :enabled)
  queue_service.rpc_callback = lambda do |message|
    @response_store.handle_rpc_response(message)
  end
end

Instance Attribute Details

#queue_serviceObject (readonly)

Returns the value of attribute queue_service.



7
8
9
# File 'lib/combi/buses/queue.rb', line 7

def queue_service
  @queue_service
end

Instance Method Details

#request(name, kind, message, options = {}) ⇒ Object



48
49
50
51
52
53
54
55
56
57
58
59
60
# File 'lib/combi/buses/queue.rb', line 48

def request(name, kind, message, options = {})
  log "Preparing request: #{name}.#{kind} #{message.inspect[0..500]}\t|| #{options.inspect}"
  options[:timeout] ||= RPC_DEFAULT_TIMEOUT
  options[:routing_key] = name.to_s
  correlation_id = Combi::Correlation.generate
  options[:correlation_id] = correlation_id
  waiter = EventedWaiter.wait_for(correlation_id, @response_store, options[:timeout])
  queue_service.ready do
    log "Making request: #{name}.#{kind} #{message.inspect[0..500]}\t|| #{options.inspect[0..500]}"
    queue_service.call(kind, message, options)
  end
  waiter
end

#respond(service_instance, request, delivery_info) ⇒ Object



62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
# File 'lib/combi/buses/queue.rb', line 62

def respond(service_instance, request, delivery_info)
  message = JSON.parse request
  kind = message['kind']
  payload = message['payload']
  options = message['options']
  if service_instance.respond_to?(kind)
    log "generating response for #{service_instance.class}#{service_instance.actions.inspect}.#{kind} #{payload.inspect[0..500]}"
    begin
      response = service_instance.send(kind, payload)
    rescue Exception => e
      response = {error: true, message: e.message}
    end
  else
    log "Service instance does not respond to #{kind}: #{service_instance.inspect}"
    response = {error: true, message: 'unknown action'}
  end
  if response.respond_to? :succeed
    log "response is deferred"
    response.callback do |service_response|
      log "responding with deferred answer: #{service_response.inspect[0..500]}"
      queue_service.respond(service_response.to_json, delivery_info)
    end
  else
    log "responding with inmediate answer: #{response.inspect[0..500]}"
    queue_service.respond(response.to_json, delivery_info)
  end
end

#respond_to(service_instance, action, options = {}) ⇒ Object



28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
# File 'lib/combi/buses/queue.rb', line 28

def respond_to(service_instance, action, options = {})
  log "registering #{action}"
  queue_options = {}
  subscription_options = {}
  if options[:fast] == true
    queue_options[:auto_delete] = false
  else
    subscription_options[:ack] = true
  end
  queue_service.ready do
    queue_service.queue(action.to_s, queue_options) do |queue|
      log "subscribing to queue #{action.to_s} with options #{queue_options}"
      queue.subscribe(subscription_options) do |delivery_info, payload|
        respond service_instance, payload, delivery_info
        queue_service.acknowledge delivery_info unless options[:fast] == true
      end
    end
  end
end

#start!Object



18
19
20
# File 'lib/combi/buses/queue.rb', line 18

def start!
  queue_service.start
end

#stop!Object



22
23
24
25
26
# File 'lib/combi/buses/queue.rb', line 22

def stop!
  queue_service.ready do
    @queue_service.disconnect
  end
end