Class: Fluffle::Server

Inherits:
Object
  • Object
show all
Includes:
Connectable
Defined in:
lib/fluffle/server.rb

Class Attribute Summary collapse

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from Connectable

#connect, #connected?, included

Constructor Details

#initialize(url: nil, connection: nil, concurrency: 1, confirms: false, mandatory: false) ⇒ Server

url: - Optional URL to pass to ‘Bunny.new` to immediately connect concurrency: - Number of threads to handle messages on (default: 1) confirms: - Whether or not to use RabbitMQ confirms



14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
# File 'lib/fluffle/server.rb', line 14

def initialize(url: nil, connection: nil, concurrency: 1, confirms: false, mandatory: false)
  url_or_connection = url || connection
  self.connect(url_or_connection) if url_or_connection

  @confirms         = confirms
  @mandatory        = mandatory
  @publish_timeout  = 5
  @shutdown_timeout = 15

  @handlers     = {}
  @handler_pool = Concurrent::FixedThreadPool.new concurrency
  @consumers    = []

  self.class.default_server ||= self
end

Class Attribute Details

.default_serverObject

Returns the value of attribute default_server.



31
32
33
# File 'lib/fluffle/server.rb', line 31

def default_server
  @default_server
end

Instance Attribute Details

#confirmsObject (readonly)

Returns the value of attribute confirms.



8
9
10
# File 'lib/fluffle/server.rb', line 8

def confirms
  @confirms
end

#connectionObject (readonly)

Returns the value of attribute connection.



8
9
10
# File 'lib/fluffle/server.rb', line 8

def connection
  @connection
end

#handler_poolObject (readonly)

Returns the value of attribute handler_pool.



8
9
10
# File 'lib/fluffle/server.rb', line 8

def handler_pool
  @handler_pool
end

#handlersObject (readonly)

Returns the value of attribute handlers.



8
9
10
# File 'lib/fluffle/server.rb', line 8

def handlers
  @handlers
end

#mandatoryObject (readonly)

Returns the value of attribute mandatory.



8
9
10
# File 'lib/fluffle/server.rb', line 8

def mandatory
  @mandatory
end

#publish_timeoutObject

Returns the value of attribute publish_timeout.



9
10
11
# File 'lib/fluffle/server.rb', line 9

def publish_timeout
  @publish_timeout
end

#shutdown_timeoutObject

Returns the value of attribute shutdown_timeout.



9
10
11
# File 'lib/fluffle/server.rb', line 9

def shutdown_timeout
  @shutdown_timeout
end

Instance Method Details

#call_handler(handler:, request:) ⇒ Object

handler - Instance of a ‘Handler` that may receive `#call` request - `Hash` representing a decoded Request



170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
# File 'lib/fluffle/server.rb', line 170

def call_handler(handler:, request:)
  t0 = Time.now

  begin
    id = request['id']

    self.validate_request request

    result = handler.call id: id,
                          method: request['method'],
                          params: request['params'],
                          meta: {}
  rescue => err
    log_error(err) if Fluffle.logger.error?

    error = self.build_error_response err
  end

  response = {
    'jsonrpc' => '2.0',
    'id'      => id,
    'meta'    => {
      'handler_duration' => (Time.now - t0)
    }
  }

  if error
    response['error'] = error
  else
    response['result'] = result
  end

  response
end

#decode(payload) ⇒ Object

Deserialize a JSON payload and extract its 3 members: id, method, params

payload - ‘String` of the payload from the queue

Returns a ‘Hash` from parsing the JSON payload (keys should be `String`)



210
211
212
# File 'lib/fluffle/server.rb', line 210

def decode(payload)
  Oj.load payload
end

#drain(queue: 'default', handler: nil, &block) ⇒ Object

Raises:

  • (ArgumentError)


34
35
36
37
38
39
40
41
42
43
44
# File 'lib/fluffle/server.rb', line 34

def drain(queue: 'default', handler: nil, &block)
  if handler && block
    raise ArgumentError, 'Cannot provide both handler: and block'
  end

  handler = Fluffle::Handlers::Dispatcher.new(&block) if block

  raise ArgumentError, 'Handler cannot be nil' if handler.nil?

  @handlers[queue.to_s] = handler
end

#handle_request(handler:, properties:, payload:) ⇒ Object



135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
# File 'lib/fluffle/server.rb', line 135

def handle_request(handler:, properties:, payload:)
  reply_to = properties[:reply_to]

  id       = nil
  response = nil

  begin
    request = self.decode payload
    id      = request['id']

    response = self.call_handler handler: handler, request: request
  rescue => err
    response = {
      'jsonrpc' => '2.0',
      'id'      => id,
      'error'   => self.build_error_response(err)
    }
  end

  stack = Fluffle::MiddlewareStack.new

  if confirms
    stack.push ->(publish) {
      @confirmer.with_confirmation timeout: publish_timeout, &publish
    }
  end

  stack.call do
    @exchange.publish Oj.dump(response), routing_key: reply_to,
                                         correlation_id: response['id']
  end
end

#handle_returnsObject



91
92
93
94
95
96
# File 'lib/fluffle/server.rb', line 91

def handle_returns
  @exchange.on_return do |return_info, _properties, _payload|
    message = Kernel.sprintf "Received return from exchange for routing key `%s' (%d %s)", return_info.routing_key, return_info.reply_code, return_info.reply_text
    Fluffle.logger.error "[Fluffle::Server] #{message}"
  end
end

#startObject



46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
# File 'lib/fluffle/server.rb', line 46

def start
  @handlers.freeze

  @channel  = @connection.create_channel
  @exchange = @channel.default_exchange

  # Ensure we only receive 1 message at a time for each consumer
  @channel.prefetch 1

  if confirms
    @confirmer = Fluffle::Confirmer.new channel: @channel
    @confirmer.confirm_select
  end

  if mandatory
    handle_returns
  end

  raise 'No handlers defined' if @handlers.empty?

  @handlers.each do |name, handler|
    qualified_name = Fluffle.request_queue_name name
    queue          = @channel.queue qualified_name

    consumer = queue.subscribe(manual_ack: true) do |delivery_info, properties, payload|
      @handler_pool.post do
        begin
          handle_request handler: handler,
                         properties: properties,
                         payload: payload
        rescue => err
          # Ensure we don't loose any errors on the handler pool's thread
          Fluffle.logger.error "[Fluffle::Server] #{err.class}: #{err.message}\n#{err.backtrace.join("\n")}"
        ensure
          @channel.ack delivery_info.delivery_tag
        end
      end
    end

    @consumers << consumer
  end

  self.wait_for_signal
end

#validate_request(request) ⇒ Object

Raises if elements of the request payload do not comply with the spec

payload - Decoded ‘Hash` of the payload (`String` keys)



217
218
219
220
# File 'lib/fluffle/server.rb', line 217

def validate_request(request)
  raise Errors::InvalidRequestError.new("Improperly formatted Request (expected `Hash', got `#{request.class}')") unless request && request.is_a?(Hash)
  raise Errors::InvalidRequestError.new("Missing `method' Request object member") unless request['method']
end

#wait_for_signalObject

NOTE: Keeping this in its own method so its functionality can be more

easily overwritten by `Fluffle::Testing`.


100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
# File 'lib/fluffle/server.rb', line 100

def wait_for_signal
  signal_read, signal_write = IO.pipe

  %w[INT TERM].each do |signal|
    Signal.trap(signal) do
      signal_write.puts signal
    end
  end

  # Adapted from Sidekiq:
  #   https://github.com/mperham/sidekiq/blob/e634177/lib/sidekiq/cli.rb#L94-L97
  while io = IO.select([signal_read])
    readables = io.first
    signal    = readables.first.gets.strip

    Fluffle.logger.info "Received #{signal}; shutting down..."

    # First stop the consumers from receiving messages
    @consumers.each &:cancel

    # Then wait for worker pools to finish processing their active jobs
    @handler_pool.shutdown
    unless @handler_pool.wait_for_termination(@shutdown_timeout)
      # `wait_for_termination` returns false if it didn't shut down in time,
      # so we need to kill it
      @handler_pool.kill
    end

    # Finally close the connection
    @channel.close

    return
  end
end