Class: Combi::Queue
- Inherits:
-
Bus
show all
- Defined in:
- lib/combi/buses/queue.rb
Constant Summary
Constants inherited
from Bus
Bus::RPC_DEFAULT_TIMEOUT
Instance Attribute Summary collapse
Attributes inherited from Bus
#services
Instance Method Summary
collapse
Methods inherited from Bus
#add_service, #enable, #log, #post_initialize, #restart!
Constructor Details
#initialize(options) ⇒ Queue
Returns a new instance of Queue.
9
10
11
12
13
14
15
16
|
# File 'lib/combi/buses/queue.rb', line 9
def initialize(options)
super
@response_store = Combi::ResponseStore.new
@queue_service = Combi::QueueService.new(options[:amqp_config], rpc: :enabled)
queue_service.rpc_callback = lambda do |message|
@response_store.handle_rpc_response(message)
end
end
|
Instance Attribute Details
#queue_service ⇒ Object
Returns the value of attribute queue_service.
7
8
9
|
# File 'lib/combi/buses/queue.rb', line 7
def queue_service
@queue_service
end
|
Instance Method Details
#request(name, kind, message, options = {}) ⇒ Object
48
49
50
51
52
53
54
55
56
57
58
59
60
|
# File 'lib/combi/buses/queue.rb', line 48
def request(name, kind, message, options = {})
log "Preparing request: #{name}.#{kind} #{message.inspect[0..500]}\t|| #{options.inspect}"
options[:timeout] ||= RPC_DEFAULT_TIMEOUT
options[:routing_key] = name.to_s
correlation_id = Combi::Correlation.generate
options[:correlation_id] = correlation_id
waiter = EventedWaiter.wait_for(correlation_id, @response_store, options[:timeout])
queue_service.ready do
log "Making request: #{name}.#{kind} #{message.inspect[0..500]}\t|| #{options.inspect[0..500]}"
queue_service.call(kind, message, options)
end
waiter
end
|
#respond(service_instance, request, delivery_info) ⇒ Object
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
|
# File 'lib/combi/buses/queue.rb', line 62
def respond(service_instance, request, delivery_info)
message = JSON.parse request
kind = message['kind']
payload = message['payload']
options = message['options']
if service_instance.respond_to?(kind)
log "generating response for #{service_instance.class}#{service_instance.actions.inspect}.#{kind} #{payload.inspect[0..500]}"
begin
response = service_instance.send(kind, payload)
rescue Exception => e
response = {error: true, message: e.message}
end
else
log "Service instance does not respond to #{kind}: #{service_instance.inspect}"
response = {error: true, message: 'unknown action'}
end
if response.respond_to? :succeed
log "response is deferred"
response.callback do |service_response|
log "responding with deferred answer: #{service_response.inspect[0..500]}"
queue_service.respond(service_response.to_json, delivery_info)
end
else
log "responding with inmediate answer: #{response.inspect[0..500]}"
queue_service.respond(response.to_json, delivery_info)
end
end
|
#respond_to(service_instance, action, options = {}) ⇒ Object
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
|
# File 'lib/combi/buses/queue.rb', line 28
def respond_to(service_instance, action, options = {})
log "registering #{action}"
queue_options = {}
subscription_options = {}
if options[:fast] == true
queue_options[:auto_delete] = false
else
subscription_options[:ack] = true
end
queue_service.ready do
queue_service.queue(action.to_s, queue_options) do |queue|
log "subscribing to queue #{action.to_s} with options #{queue_options}"
queue.subscribe(subscription_options) do |delivery_info, payload|
respond service_instance, payload, delivery_info
queue_service.acknowledge delivery_info unless options[:fast] == true
end
end
end
end
|
#start! ⇒ Object
18
19
20
|
# File 'lib/combi/buses/queue.rb', line 18
def start!
queue_service.start
end
|
#stop! ⇒ Object
22
23
24
25
26
|
# File 'lib/combi/buses/queue.rb', line 22
def stop!
queue_service.ready do
@queue_service.disconnect
end
end
|