Class: Combi::Http
- Inherits:
-
Bus
show all
- Defined in:
- lib/combi/buses/http.rb
Defined Under Namespace
Classes: Client, Server
Constant Summary
Constants inherited
from Bus
Bus::RPC_DEFAULT_TIMEOUT
Instance Attribute Summary
Attributes inherited from Bus
#services
Instance Method Summary
collapse
Methods inherited from Bus
#add_service, #enable, #initialize, #log, #restart!, #start!, #stop!
Constructor Details
This class inherits a constructor from Combi::Bus
Instance Method Details
#handlers ⇒ Object
75
76
77
|
# File 'lib/combi/buses/http.rb', line 75
def handlers
@handlers ||= {}
end
|
#manage_request(env) ⇒ Object
45
46
47
|
# File 'lib/combi/buses/http.rb', line 45
def manage_request(env)
@machine.on_message Rack::Request.new(env)
end
|
#on_message(message) ⇒ Object
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
|
# File 'lib/combi/buses/http.rb', line 49
def on_message(message)
service_name = message['service']
handler = handlers[service_name.to_s]
if handler
service_instance = handler[:service_instance]
kind = message['kind']
if service_instance.respond_to? kind
message['payload'] ||= {}
begin
response = service_instance.send(kind, message['payload'])
rescue Exception => e
response = {error: {message: e.message, backtrace: e.backtrace } }
end
{result: 'ok', response: response}
else
{result: 'error', response: {error: 'unknown action'}}
end
else
{result: 'error', response: {error: 'unknown service'}}
end
end
|
#post_initialize ⇒ Object
36
37
38
39
40
41
42
43
|
# File 'lib/combi/buses/http.rb', line 36
def post_initialize
@response_store = Combi::ResponseStore.new
if @options[:remote_api]
@machine = Client.new(@options[:remote_api], @options[:handler], self)
else
@machine = Server.new(self)
end
end
|
#request(name, kind, message, options = {}) ⇒ Object
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
|
# File 'lib/combi/buses/http.rb', line 79
def request(name, kind, message, options = {})
options[:timeout] ||= RPC_DEFAULT_TIMEOUT
correlation_id = Combi::Correlation.generate
waiter = EventedWaiter.wait_for(correlation_id, @response_store, options[:timeout])
url = "#{@options[:remote_api]}#{name}/#{kind}"
request_async = EventMachine::HttpRequest.new(url, connection_timeout: options[:timeout]).post(body: message.to_json)
request_async.callback do |r|
parsed = JSON.parse(r.response)
waiter.succeed(parsed['response'])
end
request_async.errback do |x|
waiter.fail(Timeout::Error.new)
end
waiter
end
|
#respond_to(service_instance, action, options = {}) ⇒ Object
71
72
73
|
# File 'lib/combi/buses/http.rb', line 71
def respond_to(service_instance, action, options = {})
handlers[action.to_s] = {service_instance: service_instance, options: options}
end
|