Class: Combi::InProcess
- Inherits:
-
Bus
- Object
- Bus
- Combi::InProcess
show all
- Defined in:
- lib/combi/buses/in_process.rb
Constant Summary
Constants inherited
from Bus
Bus::RPC_DEFAULT_TIMEOUT
Instance Attribute Summary
Attributes inherited from Bus
#services
Instance Method Summary
collapse
Methods inherited from Bus
#add_service, #enable, #initialize, #log, #post_initialize, #restart!, #start!, #stop!
Constructor Details
This class inherits a constructor from Combi::Bus
Instance Method Details
#memory_handlers ⇒ Object
50
51
52
|
# File 'lib/combi/buses/in_process.rb', line 50
def memory_handlers
@memory_handlers ||= {}
end
|
#request(handler_name, kind, message, options = {}) ⇒ Object
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
|
# File 'lib/combi/buses/in_process.rb', line 6
def request(handler_name, kind, message, options = {})
options[:timeout] ||= RPC_DEFAULT_TIMEOUT
handler = memory_handlers[handler_name.to_s]
waiter = EventMachine::DefaultDeferrable.new
if handler.nil?
waiter.fail('error' => 'unknown service')
else
service_instance = handler[:service_instance]
message = JSON.parse(message.to_json)
if service_instance.respond_to?(kind)
waiter.timeout(options[:timeout], 'error' => 'Timeout::Error')
begin
Timeout.timeout(options[:timeout]) do
response = service_instance.send(kind, message)
if response.respond_to? :succeed
response.callback do |service_response|
log "responding with deferred response: #{service_response.inspect[0..500]}"
waiter.succeed service_response
end
response.errback do |service_response|
failure_response = { 'error' => service_response }
log "responding with deferred failure: #{service_response.inspect[0..500]}"
waiter.fail(failure_response)
end
else
waiter.succeed response
end
end
rescue Timeout::Error => e
waiter.fail 'error' => 'Timeout::Error'
rescue Exception => e
waiter.fail 'error' => { 'message' => e.message, 'backtrace' => e.backtrace}
end
else
waiter.fail('error' => 'unknown action')
end
end
waiter
end
|
#respond_to(service_instance, action, options = {}) ⇒ Object
46
47
48
|
# File 'lib/combi/buses/in_process.rb', line 46
def respond_to(service_instance, action, options = {})
memory_handlers[action.to_s] = {service_instance: service_instance, options: options}
end
|