Class: Fluffle::Server

Inherits:
Object
  • Object
show all
Includes:
Connectable
Defined in:
lib/fluffle/server.rb

Class Attribute Summary collapse

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from Connectable

#connect, #connected?, included

Constructor Details

#initialize(url: nil, connection: nil, concurrency: 1) ⇒ Server

url: - Optional URL to pass to Bunny.new to immediately connect concurrency: - Number of threads to handle messages on (default: 1)



9
10
11
12
13
14
15
16
17
# File 'lib/fluffle/server.rb', line 9

def initialize(url: nil, connection: nil, concurrency: 1)
  url_or_connection = url || connection
  self.connect(url_or_connection) if url_or_connection

  @handlers     = {}
  @handler_pool = Concurrent::FixedThreadPool.new concurrency

  self.class.default_server ||= self
end

Class Attribute Details

.default_serverObject

Returns the value of attribute default_server.



20
21
22
# File 'lib/fluffle/server.rb', line 20

def default_server
  @default_server
end

Instance Attribute Details

#connectionObject (readonly)

Returns the value of attribute connection.



5
6
7
# File 'lib/fluffle/server.rb', line 5

def connection
  @connection
end

#handler_poolObject (readonly)

Returns the value of attribute handler_pool.



5
6
7
# File 'lib/fluffle/server.rb', line 5

def handler_pool
  @handler_pool
end

#handlersObject (readonly)

Returns the value of attribute handlers.



5
6
7
# File 'lib/fluffle/server.rb', line 5

def handlers
  @handlers
end

Instance Method Details

#call_handler(handler:, request:) ⇒ Object

handler - Instance of a Handler that may receive #call request - Hash representing a decoded Request



111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
# File 'lib/fluffle/server.rb', line 111

def call_handler(handler:, request:)
  t0 = Time.now

  begin
    id = request['id']

    self.validate_request request

    result = handler.call id: id,
                          method: request['method'],
                          params: request['params'],
                          meta: {}
  rescue => err
    log_error(err) if Fluffle.logger.error?

    error = self.build_error_response err
  end

  response = {
    'jsonrpc' => '2.0',
    'id'      => id,
    'meta'    => {
      'handler_duration' => (Time.now - t0)
    }
  }

  if error
    response['error'] = error
  else
    response['result'] = result
  end

  response
end

#decode(payload) ⇒ Object

Deserialize a JSON payload and extract its 3 members: id, method, params

payload - String of the payload from the queue

Returns a Hash from parsing the JSON payload (keys should be String)



151
152
153
# File 'lib/fluffle/server.rb', line 151

def decode(payload)
  Oj.load payload
end

#drain(queue: 'default', handler: nil, &block) ⇒ Object



23
24
25
26
27
28
29
30
31
# File 'lib/fluffle/server.rb', line 23

def drain(queue: 'default', handler: nil, &block)
  if handler && block
    raise ArgumentError, 'Cannot provide both handler: and block'
  end

  handler = Fluffle::Handlers::Dispatcher.new(&block) if block

  @handlers[queue.to_s] = handler
end

#handle_request(handler:, properties:, payload:) ⇒ Object



86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
# File 'lib/fluffle/server.rb', line 86

def handle_request(handler:, properties:, payload:)
  reply_to = properties[:reply_to]

  id       = nil
  response = nil

  begin
    request = self.decode payload
    id      = request['id']

    response = self.call_handler handler: handler, request: request
  rescue => err
    response = {
      'jsonrpc' => '2.0',
      'id'      => id,
      'error'   => self.build_error_response(err)
    }
  end

  @exchange.publish Oj.dump(response), routing_key: reply_to,
                                       correlation_id: response['id']
end

#startObject



33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
# File 'lib/fluffle/server.rb', line 33

def start
  @channel  = @connection.create_channel
  @exchange = @channel.default_exchange

  raise 'No handlers defined' if @handlers.empty?

  @handlers.each do |name, handler|
    qualified_name = Fluffle.request_queue_name name
    queue          = @channel.queue qualified_name

    queue.subscribe(manual_ack: true) do |delivery_info, properties, payload|
      @handler_pool.post do
        begin
          @channel.ack delivery_info.delivery_tag

          handle_request handler: handler,
                         properties: properties,
                         payload: payload
        rescue => err
          # Ensure we don't loose any errors on the handler pool's thread
          Fluffle.logger.error "[Fluffle::Server] #{err.class}: #{err.message}\n#{err.backtrace.join("\n")}"
        end
      end
    end
  end

  self.wait_for_signal
end

#validate_request(request) ⇒ Object

Raises if elements of the request payload do not comply with the spec

payload - Decoded Hash of the payload (String keys)



158
159
160
161
# File 'lib/fluffle/server.rb', line 158

def validate_request(request)
  raise Errors::InvalidRequestError.new("Improperly formatted Request (expected `Hash', got `#{request.class}')") unless request && request.is_a?(Hash)
  raise Errors::InvalidRequestError.new("Missing `method' Request object member") unless request['method']
end

#wait_for_signalObject

NOTE: Keeping this in its own method so its functionality can be more

easily overwritten by `Fluffle::Testing`.


64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
# File 'lib/fluffle/server.rb', line 64

def wait_for_signal
  signal_read, signal_write = IO.pipe

  %w[INT TERM].each do |signal|
    Signal.trap(signal) do
      signal_write.puts signal
    end
  end

  # Adapted from Sidekiq:
  #   https://github.com/mperham/sidekiq/blob/e634177/lib/sidekiq/cli.rb#L94-L97
  while io = IO.select([signal_read])
    readables = io.first
    signal    = readables.first.gets.strip

    Fluffle.logger.info "Received #{signal}; shutting down..."
    @channel.work_pool.shutdown

    return
  end
end