Class: Combi::Http

Inherits:
Bus
  • Object
show all
Defined in:
lib/combi/buses/http.rb

Defined Under Namespace

Classes: Client, Server

Constant Summary

Constants inherited from Bus

Bus::RPC_DEFAULT_TIMEOUT

Instance Attribute Summary

Attributes inherited from Bus

#services

Instance Method Summary collapse

Methods inherited from Bus

#add_service, #enable, #initialize, #log, #restart!, #start!, #stop!

Constructor Details

This class inherits a constructor from Combi::Bus

Instance Method Details

#handlersObject



75
76
77
# File 'lib/combi/buses/http.rb', line 75

def handlers
  @handlers ||= {}
end

#manage_request(env) ⇒ Object



45
46
47
# File 'lib/combi/buses/http.rb', line 45

def manage_request(env)
  @machine.on_message Rack::Request.new(env)
end

#on_message(message) ⇒ Object



49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
# File 'lib/combi/buses/http.rb', line 49

def on_message(message)
  service_name = message['service']
  handler = handlers[service_name.to_s]
  if handler
    service_instance = handler[:service_instance]
    kind = message['kind']
    if service_instance.respond_to? kind
      message['payload'] ||= {}
      begin
        response = service_instance.send(kind, message['payload'])
      rescue Exception => e
        response = {error: {message: e.message, backtrace: e.backtrace } }
      end
      {result: 'ok', response: response}
    else
      {result: 'error', response: {error: 'unknown action'}}
    end
  else
    {result: 'error', response: {error: 'unknown service'}}
  end
end

#post_initializeObject



36
37
38
39
40
41
42
43
# File 'lib/combi/buses/http.rb', line 36

def post_initialize
  @response_store = Combi::ResponseStore.new
  if @options[:remote_api]
    @machine = Client.new(@options[:remote_api], @options[:handler], self)
  else
    @machine = Server.new(self)
  end
end

#request(name, kind, message, options = {}) ⇒ Object



79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
# File 'lib/combi/buses/http.rb', line 79

def request(name, kind, message, options = {})
  options[:timeout] ||= RPC_DEFAULT_TIMEOUT

  correlation_id = Combi::Correlation.generate
  waiter = EventedWaiter.wait_for(correlation_id, @response_store, options[:timeout])
  url = "#{@options[:remote_api]}#{name}/#{kind}"
  request_async = EventMachine::HttpRequest.new(url, connection_timeout: options[:timeout]).post(body: message.to_json)
  request_async.callback do |r|
    parsed = JSON.parse(r.response)
    waiter.succeed(parsed['response'])
  end
  request_async.errback do |x|
    waiter.fail(Timeout::Error.new)
  end
  waiter
end

#respond_to(service_instance, action, options = {}) ⇒ Object



71
72
73
# File 'lib/combi/buses/http.rb', line 71

def respond_to(service_instance, action, options = {})
  handlers[action.to_s] = {service_instance: service_instance, options: options}
end