Class: Combi::Http
- Inherits:
-
Bus
show all
- Defined in:
- lib/combi/buses/http.rb
Defined Under Namespace
Classes: Client, Server
Constant Summary
Constants inherited
from Bus
Bus::RPC_DEFAULT_TIMEOUT
Instance Attribute Summary
Attributes inherited from Bus
#routes
Instance Method Summary
collapse
Methods inherited from Bus
#add_service, #enable, #initialize, #log, #restart!, #start!, #stop!
Constructor Details
This class inherits a constructor from Combi::Bus
Instance Method Details
#manage_request(env) ⇒ Object
45
46
47
|
# File 'lib/combi/buses/http.rb', line 45
def manage_request(env)
@machine.on_message Rack::Request.new(env)
end
|
#on_message(service_name:, kind:, payload: {}) ⇒ Object
49
50
51
|
# File 'lib/combi/buses/http.rb', line 49
def on_message(service_name:, kind:, payload: {})
invoke_service(service_name, kind, payload)
end
|
#post_initialize ⇒ Object
36
37
38
39
40
41
42
43
|
# File 'lib/combi/buses/http.rb', line 36
def post_initialize
@response_store = Combi::ResponseStore.new
if @options[:remote_api]
@machine = Client.new(@options[:remote_api], @options[:handler], self)
else
@machine = Server.new(self)
end
end
|
#request(name, kind, message, options = {}) ⇒ Object
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
|
# File 'lib/combi/buses/http.rb', line 53
def request(name, kind, message, options = {})
options[:timeout] ||= RPC_DEFAULT_TIMEOUT
url = "#{@options[:remote_api]}#{name}/#{kind}"
message_json = Yajl::Encoder.encode(message)
request_async = EventMachine::HttpRequest.new(url, connection_timeout: options[:timeout]).post(body: message_json)
if options[:fast]
waiter = nil
else
correlation_id = Combi::Correlation.generate
waiter = @response_store.wait_for(correlation_id, options[:timeout])
request_async.callback do |r|
parsed = Yajl::Parser.parse(r.response, symbolize_keys: true)
waiter.succeed(parsed)
end
request_async.errback do |x|
waiter.fail(Timeout::Error.new)
end
end
waiter
end
|