Class: Fluffle::Server
- Inherits:
-
Object
- Object
- Fluffle::Server
- Includes:
- Connectable
- Defined in:
- lib/fluffle/server.rb
Class Attribute Summary collapse
-
.default_server ⇒ Object
Returns the value of attribute default_server.
Instance Attribute Summary collapse
-
#connection ⇒ Object
readonly
Returns the value of attribute connection.
-
#handler_pool ⇒ Object
readonly
Returns the value of attribute handler_pool.
-
#handlers ⇒ Object
readonly
Returns the value of attribute handlers.
Instance Method Summary collapse
-
#call_handler(handler:, request:) ⇒ Object
handler - Instance of a
Handlerthat may receive#callrequest -Hashrepresenting a decoded Request. -
#decode(payload) ⇒ Object
Deserialize a JSON payload and extract its 3 members: id, method, params.
- #drain(queue: 'default', handler: nil, &block) ⇒ Object
- #handle_request(handler:, properties:, payload:) ⇒ Object
-
#initialize(url: nil, connection: nil, concurrency: 1) ⇒ Server
constructor
url: - Optional URL to pass to
Bunny.newto immediately connect concurrency: - Number of threads to handle messages on (default: 1). - #start ⇒ Object
-
#validate_request(request) ⇒ Object
Raises if elements of the request payload do not comply with the spec.
-
#wait_for_signal ⇒ Object
NOTE: Keeping this in its own method so its functionality can be more easily overwritten by
Fluffle::Testing.
Methods included from Connectable
#connect, #connected?, included
Constructor Details
#initialize(url: nil, connection: nil, concurrency: 1) ⇒ Server
url: - Optional URL to pass to Bunny.new to immediately connect concurrency: - Number of threads to handle messages on (default: 1)
9 10 11 12 13 14 15 16 17 |
# File 'lib/fluffle/server.rb', line 9 def initialize(url: nil, connection: nil, concurrency: 1) url_or_connection = url || connection self.connect(url_or_connection) if url_or_connection @handlers = {} @handler_pool = Concurrent::FixedThreadPool.new concurrency self.class.default_server ||= self end |
Class Attribute Details
.default_server ⇒ Object
Returns the value of attribute default_server.
20 21 22 |
# File 'lib/fluffle/server.rb', line 20 def default_server @default_server end |
Instance Attribute Details
#connection ⇒ Object (readonly)
Returns the value of attribute connection.
5 6 7 |
# File 'lib/fluffle/server.rb', line 5 def connection @connection end |
#handler_pool ⇒ Object (readonly)
Returns the value of attribute handler_pool.
5 6 7 |
# File 'lib/fluffle/server.rb', line 5 def handler_pool @handler_pool end |
#handlers ⇒ Object (readonly)
Returns the value of attribute handlers.
5 6 7 |
# File 'lib/fluffle/server.rb', line 5 def handlers @handlers end |
Instance Method Details
#call_handler(handler:, request:) ⇒ Object
handler - Instance of a Handler that may receive #call request - Hash representing a decoded Request
111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 |
# File 'lib/fluffle/server.rb', line 111 def call_handler(handler:, request:) t0 = Time.now begin id = request['id'] self.validate_request request result = handler.call id: id, method: request['method'], params: request['params'], meta: {} rescue => err log_error(err) if Fluffle.logger.error? error = self.build_error_response err end response = { 'jsonrpc' => '2.0', 'id' => id, 'meta' => { 'handler_duration' => (Time.now - t0) } } if error response['error'] = error else response['result'] = result end response end |
#decode(payload) ⇒ Object
Deserialize a JSON payload and extract its 3 members: id, method, params
payload - String of the payload from the queue
Returns a Hash from parsing the JSON payload (keys should be String)
151 152 153 |
# File 'lib/fluffle/server.rb', line 151 def decode(payload) Oj.load payload end |
#drain(queue: 'default', handler: nil, &block) ⇒ Object
23 24 25 26 27 28 29 30 31 |
# File 'lib/fluffle/server.rb', line 23 def drain(queue: 'default', handler: nil, &block) if handler && block raise ArgumentError, 'Cannot provide both handler: and block' end handler = Fluffle::Handlers::Dispatcher.new(&block) if block @handlers[queue.to_s] = handler end |
#handle_request(handler:, properties:, payload:) ⇒ Object
86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 |
# File 'lib/fluffle/server.rb', line 86 def handle_request(handler:, properties:, payload:) reply_to = properties[:reply_to] id = nil response = nil begin request = self.decode payload id = request['id'] response = self.call_handler handler: handler, request: request rescue => err response = { 'jsonrpc' => '2.0', 'id' => id, 'error' => self.build_error_response(err) } end @exchange.publish Oj.dump(response), routing_key: reply_to, correlation_id: response['id'] end |
#start ⇒ Object
33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 |
# File 'lib/fluffle/server.rb', line 33 def start @channel = @connection.create_channel @exchange = @channel.default_exchange raise 'No handlers defined' if @handlers.empty? @handlers.each do |name, handler| qualified_name = Fluffle.request_queue_name name queue = @channel.queue qualified_name queue.subscribe(manual_ack: true) do |delivery_info, properties, payload| @handler_pool.post do begin @channel.ack delivery_info.delivery_tag handle_request handler: handler, properties: properties, payload: payload rescue => err # Ensure we don't loose any errors on the handler pool's thread Fluffle.logger.error "[Fluffle::Server] #{err.class}: #{err.message}\n#{err.backtrace.join("\n")}" end end end end self.wait_for_signal end |
#validate_request(request) ⇒ Object
Raises if elements of the request payload do not comply with the spec
payload - Decoded Hash of the payload (String keys)
158 159 160 161 |
# File 'lib/fluffle/server.rb', line 158 def validate_request(request) raise Errors::InvalidRequestError.new("Improperly formatted Request (expected `Hash', got `#{request.class}')") unless request && request.is_a?(Hash) raise Errors::InvalidRequestError.new("Missing `method' Request object member") unless request['method'] end |
#wait_for_signal ⇒ Object
NOTE: Keeping this in its own method so its functionality can be more
easily overwritten by `Fluffle::Testing`.
64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 |
# File 'lib/fluffle/server.rb', line 64 def wait_for_signal signal_read, signal_write = IO.pipe %w[INT TERM].each do |signal| Signal.trap(signal) do signal_write.puts signal end end # Adapted from Sidekiq: # https://github.com/mperham/sidekiq/blob/e634177/lib/sidekiq/cli.rb#L94-L97 while io = IO.select([signal_read]) readables = io.first signal = readables.first.gets.strip Fluffle.logger.info "Received #{signal}; shutting down..." @channel.work_pool.shutdown return end end |