Class: Combi::Queue
- Inherits:
-
Bus
show all
- Defined in:
- lib/combi/buses/queue.rb
Constant Summary
Constants inherited
from Bus
Bus::RPC_DEFAULT_TIMEOUT
Instance Attribute Summary collapse
Attributes inherited from Bus
#routes
Instance Method Summary
collapse
Methods inherited from Bus
#add_service, #enable, #log, #post_initialize, #restart!
Constructor Details
#initialize(options) ⇒ Queue
Returns a new instance of Queue.
9
10
11
12
13
14
15
16
|
# File 'lib/combi/buses/queue.rb', line 9
def initialize(options)
super
@response_store = Combi::ResponseStore.new
@queue_service = Combi::QueueService.new(options[:amqp_config], rpc: :enabled)
queue_service.rpc_callback = lambda do |message|
@response_store.handle_rpc_response(message)
end
end
|
Instance Attribute Details
#queue_service ⇒ Object
Returns the value of attribute queue_service.
7
8
9
|
# File 'lib/combi/buses/queue.rb', line 7
def queue_service
@queue_service
end
|
Instance Method Details
#add_routes_for(service_name, service_instance) ⇒ Object
28
29
30
31
|
# File 'lib/combi/buses/queue.rb', line 28
def add_routes_for(service_name, service_instance)
create_queue_for_service(service_name)
super
end
|
#create_queue_for_service(service_name) ⇒ Object
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
|
# File 'lib/combi/buses/queue.rb', line 33
def create_queue_for_service(service_name)
log "creating queue #{service_name}"
queue_options = {}
subscription_options = {}
subscription_options[:ack] = true
queue_service.ready do
queue_service.queue(service_name.to_s, queue_options) do |queue|
log "subscribing to queue #{service_name.to_s}"
queue.subscribe(subscription_options) do |delivery_info, payload|
process_queue_message service_name, payload, delivery_info
queue_service.acknowledge delivery_info
end
end
end
end
|
#process_queue_message(service_name, request, delivery_info) ⇒ Object
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
|
# File 'lib/combi/buses/queue.rb', line 69
def process_queue_message(service_name, request, delivery_info)
message = Yajl::Parser.parse request, symbolize_keys: true
kind = message[:kind]
payload = message[:payload]
options = message[:options]
response = invoke_service(service_name, kind, payload)
if delivery_info.reply_to
response.callback do |service_response|
queue_service.respond service_response, delivery_info
end
response.errback do |service_response|
failure_response = { error: service_response }
queue_service.respond failure_response, delivery_info
end
end
end
|
#request(name, kind, message, timeout: RPC_DEFAULT_TIMEOUT, fast: false) ⇒ Object
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
|
# File 'lib/combi/buses/queue.rb', line 49
def request(name, kind, message, timeout: RPC_DEFAULT_TIMEOUT, fast: false)
log "Preparing request: #{name}.#{kind} #{message.inspect[0..500]}\t|| timeout: #{timeout} fast: #{fast}"
options = {
timeout: timeout,
routing_key: name.to_s
}
if fast
waiter = nil
else
correlation_id = Combi::Correlation.generate
options[:correlation_id] = correlation_id
waiter = @response_store.wait_for correlation_id, timeout
end
queue_service.next_ready_only do
log "Making request: #{name}.#{kind} #{message.inspect[0..500]}\t|| #{options.inspect[0..500]}"
queue_service.publish_request(kind, message, options)
end
waiter
end
|
#start! ⇒ Object
18
19
20
|
# File 'lib/combi/buses/queue.rb', line 18
def start!
queue_service.start
end
|
#stop! ⇒ Object
22
23
24
25
26
|
# File 'lib/combi/buses/queue.rb', line 22
def stop!
queue_service.ready do
@queue_service.disconnect
end
end
|