Class: Combi::Queue
- Inherits:
-
Bus
show all
- Defined in:
- lib/combi/buses/queue.rb
Constant Summary
Constants inherited
from Bus
Bus::RPC_DEFAULT_TIMEOUT
Instance Attribute Summary collapse
Attributes inherited from Bus
#services
Instance Method Summary
collapse
Methods inherited from Bus
#add_service, #enable, #log, #post_initialize, #restart!
Constructor Details
#initialize(options) ⇒ Queue
Returns a new instance of Queue.
9
10
11
12
13
14
15
16
|
# File 'lib/combi/buses/queue.rb', line 9
def initialize(options)
super
@response_store = Combi::ResponseStore.new
@queue_service = Combi::QueueService.new(options[:amqp_config], rpc: :enabled)
queue_service.rpc_callback = lambda do |message|
@response_store.handle_rpc_response(message)
end
end
|
Instance Attribute Details
#queue_service ⇒ Object
Returns the value of attribute queue_service.
7
8
9
|
# File 'lib/combi/buses/queue.rb', line 7
def queue_service
@queue_service
end
|
Instance Method Details
#request(name, kind, message, options = {}) ⇒ Object
48
49
50
51
52
53
54
55
56
57
58
59
60
|
# File 'lib/combi/buses/queue.rb', line 48
def request(name, kind, message, options = {})
log "Preparing request: #{name}.#{kind} #{message.inspect[0..500]}\t|| #{options.inspect}"
options[:timeout] ||= RPC_DEFAULT_TIMEOUT
options[:routing_key] = name.to_s
correlation_id = Combi::Correlation.generate
options[:correlation_id] = correlation_id
waiter = EventedWaiter.wait_for(correlation_id, @response_store, options[:timeout])
queue_service.ready do
log "Making request: #{name}.#{kind} #{message.inspect[0..500]}\t|| #{options.inspect[0..500]}"
queue_service.call(kind, message, options)
end
waiter
end
|
#respond(service_instance, request, delivery_info) ⇒ Object
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
|
# File 'lib/combi/buses/queue.rb', line 62
def respond(service_instance, request, delivery_info)
message = JSON.parse request
kind = message['kind']
payload = message['payload']
options = message['options']
if service_instance.respond_to?(kind)
log "generating response for #{service_instance.class}#{service_instance.actions.inspect}.#{kind} #{payload.inspect[0..500]}"
begin
response = service_instance.send(kind, payload)
rescue Exception => e
response = {error: { message: e.message, backtrace: e.backtrace } }
end
else
log "Service instance does not respond to #{kind}: #{service_instance.inspect}"
response = {error: 'unknown action'}
end
if response.respond_to? :succeed
log "response is deferred"
response.callback do |service_response|
log "responding with deferred answer: #{service_response.inspect[0..500]}"
queue_service.respond(service_response.to_json, delivery_info)
end
response.errback do |service_response|
failure_response = { error: service_response }
log "responding with deferred failure: #{service_response.inspect[0..500]}"
queue_service.respond(failure_response.to_json, delivery_info)
end
else
log "responding with inmediate answer: #{response.inspect[0..500]}"
queue_service.respond(response.to_json, delivery_info)
end
end
|
#respond_to(service_instance, action, options = {}) ⇒ Object
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
|
# File 'lib/combi/buses/queue.rb', line 28
def respond_to(service_instance, action, options = {})
log "registering #{action}"
queue_options = {}
subscription_options = {}
if options[:fast] == true
queue_options[:auto_delete] = false
else
subscription_options[:ack] = true
end
queue_service.ready do
queue_service.queue(action.to_s, queue_options) do |queue|
log "subscribing to queue #{action.to_s} with options #{queue_options}"
queue.subscribe(subscription_options) do |delivery_info, payload|
respond service_instance, payload, delivery_info
queue_service.acknowledge delivery_info unless options[:fast] == true
end
end
end
end
|
#start! ⇒ Object
18
19
20
|
# File 'lib/combi/buses/queue.rb', line 18
def start!
queue_service.start
end
|
#stop! ⇒ Object
22
23
24
25
26
|
# File 'lib/combi/buses/queue.rb', line 22
def stop!
queue_service.ready do
@queue_service.disconnect
end
end
|