Class: Fluffle::Server
- Inherits:
-
Object
- Object
- Fluffle::Server
- Includes:
- Connectable
- Defined in:
- lib/fluffle/server.rb
Class Attribute Summary collapse
-
.default_server ⇒ Object
Returns the value of attribute default_server.
Instance Attribute Summary collapse
-
#confirms ⇒ Object
readonly
Returns the value of attribute confirms.
-
#connection ⇒ Object
readonly
Returns the value of attribute connection.
-
#handler_pool ⇒ Object
readonly
Returns the value of attribute handler_pool.
-
#handlers ⇒ Object
readonly
Returns the value of attribute handlers.
-
#mandatory ⇒ Object
readonly
Returns the value of attribute mandatory.
-
#publish_timeout ⇒ Object
Returns the value of attribute publish_timeout.
-
#shutdown_timeout ⇒ Object
Returns the value of attribute shutdown_timeout.
Instance Method Summary collapse
-
#call_handler(handler:, request:) ⇒ Object
handler - Instance of a ‘Handler` that may receive `#call` request - `Hash` representing a decoded Request.
-
#decode(payload) ⇒ Object
Deserialize a JSON payload and extract its 3 members: id, method, params.
- #drain(queue: 'default', handler: nil, &block) ⇒ Object
- #handle_request(handler:, properties:, payload:) ⇒ Object
- #handle_returns ⇒ Object
-
#initialize(url: nil, connection: nil, concurrency: 1, confirms: false, mandatory: false) ⇒ Server
constructor
url: - Optional URL to pass to ‘Bunny.new` to immediately connect concurrency: - Number of threads to handle messages on (default: 1) confirms: - Whether or not to use RabbitMQ confirms.
- #start ⇒ Object
-
#validate_request(request) ⇒ Object
Raises if elements of the request payload do not comply with the spec.
-
#wait_for_signal ⇒ Object
NOTE: Keeping this in its own method so its functionality can be more easily overwritten by ‘Fluffle::Testing`.
Methods included from Connectable
#connect, #connected?, included
Constructor Details
#initialize(url: nil, connection: nil, concurrency: 1, confirms: false, mandatory: false) ⇒ Server
url: - Optional URL to pass to ‘Bunny.new` to immediately connect concurrency: - Number of threads to handle messages on (default: 1) confirms: - Whether or not to use RabbitMQ confirms
14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 |
# File 'lib/fluffle/server.rb', line 14 def initialize(url: nil, connection: nil, concurrency: 1, confirms: false, mandatory: false) url_or_connection = url || connection self.connect(url_or_connection) if url_or_connection @confirms = confirms @mandatory = mandatory @publish_timeout = 5 @shutdown_timeout = 15 @handlers = {} @handler_pool = Concurrent::FixedThreadPool.new concurrency @consumers = [] self.class.default_server ||= self end |
Class Attribute Details
.default_server ⇒ Object
Returns the value of attribute default_server.
31 32 33 |
# File 'lib/fluffle/server.rb', line 31 def default_server @default_server end |
Instance Attribute Details
#confirms ⇒ Object (readonly)
Returns the value of attribute confirms.
8 9 10 |
# File 'lib/fluffle/server.rb', line 8 def confirms @confirms end |
#connection ⇒ Object (readonly)
Returns the value of attribute connection.
8 9 10 |
# File 'lib/fluffle/server.rb', line 8 def connection @connection end |
#handler_pool ⇒ Object (readonly)
Returns the value of attribute handler_pool.
8 9 10 |
# File 'lib/fluffle/server.rb', line 8 def handler_pool @handler_pool end |
#handlers ⇒ Object (readonly)
Returns the value of attribute handlers.
8 9 10 |
# File 'lib/fluffle/server.rb', line 8 def handlers @handlers end |
#mandatory ⇒ Object (readonly)
Returns the value of attribute mandatory.
8 9 10 |
# File 'lib/fluffle/server.rb', line 8 def mandatory @mandatory end |
#publish_timeout ⇒ Object
Returns the value of attribute publish_timeout.
9 10 11 |
# File 'lib/fluffle/server.rb', line 9 def publish_timeout @publish_timeout end |
#shutdown_timeout ⇒ Object
Returns the value of attribute shutdown_timeout.
9 10 11 |
# File 'lib/fluffle/server.rb', line 9 def shutdown_timeout @shutdown_timeout end |
Instance Method Details
#call_handler(handler:, request:) ⇒ Object
handler - Instance of a ‘Handler` that may receive `#call` request - `Hash` representing a decoded Request
170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 |
# File 'lib/fluffle/server.rb', line 170 def call_handler(handler:, request:) t0 = Time.now begin id = request['id'] self.validate_request request result = handler.call id: id, method: request['method'], params: request['params'], meta: {} rescue => err log_error(err) if Fluffle.logger.error? error = self.build_error_response err end response = { 'jsonrpc' => '2.0', 'id' => id, 'meta' => { 'handler_duration' => (Time.now - t0) } } if error response['error'] = error else response['result'] = result end response end |
#decode(payload) ⇒ Object
Deserialize a JSON payload and extract its 3 members: id, method, params
payload - ‘String` of the payload from the queue
Returns a ‘Hash` from parsing the JSON payload (keys should be `String`)
210 211 212 |
# File 'lib/fluffle/server.rb', line 210 def decode(payload) Oj.load payload end |
#drain(queue: 'default', handler: nil, &block) ⇒ Object
34 35 36 37 38 39 40 41 42 43 44 |
# File 'lib/fluffle/server.rb', line 34 def drain(queue: 'default', handler: nil, &block) if handler && block raise ArgumentError, 'Cannot provide both handler: and block' end handler = Fluffle::Handlers::Dispatcher.new(&block) if block raise ArgumentError, 'Handler cannot be nil' if handler.nil? @handlers[queue.to_s] = handler end |
#handle_request(handler:, properties:, payload:) ⇒ Object
135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 |
# File 'lib/fluffle/server.rb', line 135 def handle_request(handler:, properties:, payload:) reply_to = properties[:reply_to] id = nil response = nil begin request = self.decode payload id = request['id'] response = self.call_handler handler: handler, request: request rescue => err response = { 'jsonrpc' => '2.0', 'id' => id, 'error' => self.build_error_response(err) } end stack = Fluffle::MiddlewareStack.new if confirms stack.push ->(publish) { @confirmer.with_confirmation timeout: publish_timeout, &publish } end stack.call do @exchange.publish Oj.dump(response), routing_key: reply_to, correlation_id: response['id'] end end |
#handle_returns ⇒ Object
91 92 93 94 95 96 |
# File 'lib/fluffle/server.rb', line 91 def handle_returns @exchange.on_return do |return_info, _properties, _payload| = Kernel.sprintf "Received return from exchange for routing key `%s' (%d %s)", return_info.routing_key, return_info.reply_code, return_info.reply_text Fluffle.logger.error "[Fluffle::Server] #{}" end end |
#start ⇒ Object
46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 |
# File 'lib/fluffle/server.rb', line 46 def start @handlers.freeze @channel = @connection.create_channel @exchange = @channel.default_exchange # Ensure we only receive 1 message at a time for each consumer @channel.prefetch 1 if confirms @confirmer = Fluffle::Confirmer.new channel: @channel @confirmer.confirm_select end if mandatory handle_returns end raise 'No handlers defined' if @handlers.empty? @handlers.each do |name, handler| qualified_name = Fluffle.request_queue_name name queue = @channel.queue qualified_name consumer = queue.subscribe(manual_ack: true) do |delivery_info, properties, payload| @handler_pool.post do begin handle_request handler: handler, properties: properties, payload: payload rescue => err # Ensure we don't loose any errors on the handler pool's thread Fluffle.logger.error "[Fluffle::Server] #{err.class}: #{err.}\n#{err.backtrace.join("\n")}" ensure @channel.ack delivery_info.delivery_tag end end end @consumers << consumer end self.wait_for_signal end |
#validate_request(request) ⇒ Object
Raises if elements of the request payload do not comply with the spec
payload - Decoded ‘Hash` of the payload (`String` keys)
217 218 219 220 |
# File 'lib/fluffle/server.rb', line 217 def validate_request(request) raise Errors::InvalidRequestError.new("Improperly formatted Request (expected `Hash', got `#{request.class}')") unless request && request.is_a?(Hash) raise Errors::InvalidRequestError.new("Missing `method' Request object member") unless request['method'] end |
#wait_for_signal ⇒ Object
NOTE: Keeping this in its own method so its functionality can be more
easily overwritten by `Fluffle::Testing`.
100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 |
# File 'lib/fluffle/server.rb', line 100 def wait_for_signal signal_read, signal_write = IO.pipe %w[INT TERM].each do |signal| Signal.trap(signal) do signal_write.puts signal end end # Adapted from Sidekiq: # https://github.com/mperham/sidekiq/blob/e634177/lib/sidekiq/cli.rb#L94-L97 while io = IO.select([signal_read]) readables = io.first signal = readables.first.gets.strip Fluffle.logger.info "Received #{signal}; shutting down..." # First stop the consumers from receiving messages @consumers.each &:cancel # Then wait for worker pools to finish processing their active jobs @handler_pool.shutdown unless @handler_pool.wait_for_termination(@shutdown_timeout) # `wait_for_termination` returns false if it didn't shut down in time, # so we need to kill it @handler_pool.kill end # Finally close the connection @channel.close return end end |