Class: Fluffle::Server
- Inherits:
-
Object
- Object
- Fluffle::Server
- Includes:
- Connectable
- Defined in:
- lib/fluffle/server.rb
Class Attribute Summary collapse
-
.default_server ⇒ Object
Returns the value of attribute default_server.
Instance Attribute Summary collapse
-
#connection ⇒ Object
readonly
Returns the value of attribute connection.
-
#handler_pool ⇒ Object
readonly
Returns the value of attribute handler_pool.
-
#handlers ⇒ Object
readonly
Returns the value of attribute handlers.
Instance Method Summary collapse
-
#call_handler(handler:, request:) ⇒ Object
handler - Instance of a
Handlerthat may receive#callrequest -Hashrepresenting a decoded Request. -
#decode(payload) ⇒ Object
Deserialize a JSON payload and extract its 3 members: id, method, params.
- #drain(queue: 'default', handler: nil, &block) ⇒ Object
- #handle_request(handler:, properties:, payload:) ⇒ Object
-
#initialize(url: nil, connection: nil, concurrency: 1) ⇒ Server
constructor
url: - Optional URL to pass to
Bunny.newto immediately connect concurrency: - Number of threads to handle messages on (default: 1). - #start ⇒ Object
-
#validate_request(request) ⇒ Object
Raises if elements of the request payload do not comply with the spec.
-
#wait_for_signal ⇒ Object
NOTE: Keeping this in its own method so its functionality can be more easily overwritten by
Fluffle::Testing.
Methods included from Connectable
#connect, #connected?, included
Constructor Details
#initialize(url: nil, connection: nil, concurrency: 1) ⇒ Server
url: - Optional URL to pass to Bunny.new to immediately connect concurrency: - Number of threads to handle messages on (default: 1)
9 10 11 12 13 14 15 16 17 |
# File 'lib/fluffle/server.rb', line 9 def initialize(url: nil, connection: nil, concurrency: 1) url_or_connection = url || connection self.connect(url_or_connection) if url_or_connection @handlers = {} @handler_pool = Concurrent::FixedThreadPool.new concurrency self.class.default_server ||= self end |
Class Attribute Details
.default_server ⇒ Object
Returns the value of attribute default_server.
20 21 22 |
# File 'lib/fluffle/server.rb', line 20 def default_server @default_server end |
Instance Attribute Details
#connection ⇒ Object (readonly)
Returns the value of attribute connection.
5 6 7 |
# File 'lib/fluffle/server.rb', line 5 def connection @connection end |
#handler_pool ⇒ Object (readonly)
Returns the value of attribute handler_pool.
5 6 7 |
# File 'lib/fluffle/server.rb', line 5 def handler_pool @handler_pool end |
#handlers ⇒ Object (readonly)
Returns the value of attribute handlers.
5 6 7 |
# File 'lib/fluffle/server.rb', line 5 def handlers @handlers end |
Instance Method Details
#call_handler(handler:, request:) ⇒ Object
handler - Instance of a Handler that may receive #call request - Hash representing a decoded Request
118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 |
# File 'lib/fluffle/server.rb', line 118 def call_handler(handler:, request:) begin # We don't yet know if it's valid, so we have to be as cautious as # possible about getting the ID id = begin request['id']; rescue; nil end self.validate_request request result = handler.call id: id, method: request['method'], params: request['params'], meta: {} rescue => err log_error(err) if Fluffle.logger.error? error = self.build_error_response err end response = { 'jsonrpc' => '2.0', 'id' => id } if error response['error'] = error else response['result'] = result end response end |
#decode(payload) ⇒ Object
Deserialize a JSON payload and extract its 3 members: id, method, params
payload - String of the payload from the queue
Returns a Hash from parsing the JSON payload (keys should be String)
152 153 154 |
# File 'lib/fluffle/server.rb', line 152 def decode(payload) Oj.load payload end |
#drain(queue: 'default', handler: nil, &block) ⇒ Object
23 24 25 26 27 28 29 30 31 |
# File 'lib/fluffle/server.rb', line 23 def drain(queue: 'default', handler: nil, &block) if handler && block raise ArgumentError, 'Cannot provide both handler: and block' end handler = Fluffle::Handlers::Dispatcher.new(&block) if block @handlers[queue.to_s] = handler end |
#handle_request(handler:, properties:, payload:) ⇒ Object
79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 |
# File 'lib/fluffle/server.rb', line 79 def handle_request(handler:, properties:, payload:) reply_to = properties[:reply_to] responses = [] begin decoded = self.decode payload requests = if decoded.is_a? Hash [ decoded ] # Single request elsif decoded.is_a? Array decoded # Batch request else raise Errors::InvalidRequestError.new('Payload was neither an Array nor an Object') end requests.each do |request| response = self.call_handler handler: handler, request: request responses << response end rescue => err responses << { 'jsonrpc' => '2.0', 'id' => nil, 'error' => self.build_error_response(err) } end responses.each do |response| @exchange.publish Oj.dump(response), routing_key: reply_to, correlation_id: response['id'] end end |
#start ⇒ Object
33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 |
# File 'lib/fluffle/server.rb', line 33 def start @channel = @connection.create_channel @exchange = @channel.default_exchange raise 'No handlers defined' if @handlers.empty? @handlers.each do |name, handler| qualified_name = Fluffle.request_queue_name name queue = @channel.queue qualified_name queue.subscribe do |_delivery_info, properties, payload| @handler_pool.post do self.handle_request handler: handler, properties: properties, payload: payload end end end self.wait_for_signal end |
#validate_request(request) ⇒ Object
Raises if elements of the request payload do not comply with the spec
payload - Decoded Hash of the payload (String keys)
159 160 161 162 |
# File 'lib/fluffle/server.rb', line 159 def validate_request(request) raise Errors::InvalidRequestError.new("Improperly formatted Request (expected `Hash', got `#{request.class}')") unless request && request.is_a?(Hash) raise Errors::InvalidRequestError.new("Missing `method' Request object member") unless request['method'] end |
#wait_for_signal ⇒ Object
NOTE: Keeping this in its own method so its functionality can be more
easily overwritten by `Fluffle::Testing`.
57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 |
# File 'lib/fluffle/server.rb', line 57 def wait_for_signal signal_read, signal_write = IO.pipe %w[INT TERM].each do |signal| Signal.trap(signal) do signal_write.puts signal end end # Adapted from Sidekiq: # https://github.com/mperham/sidekiq/blob/e634177/lib/sidekiq/cli.rb#L94-L97 while io = IO.select([signal_read]) readables = io.first signal = readables.first.gets.strip Fluffle.logger.info "Received #{signal}; shutting down..." @channel.work_pool.shutdown return end end |