Class: Fluffle::Server

Inherits:
Object
  • Object
show all
Includes:
Connectable
Defined in:
lib/fluffle/server.rb

Class Attribute Summary collapse

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from Connectable

#connect, #connected?, included

Constructor Details

#initialize(url: nil, connection: nil, concurrency: 1) ⇒ Server

url: - Optional URL to pass to Bunny.new to immediately connect concurrency: - Number of threads to handle messages on (default: 1)



9
10
11
12
13
14
15
16
17
# File 'lib/fluffle/server.rb', line 9

def initialize(url: nil, connection: nil, concurrency: 1)
  url_or_connection = url || connection
  self.connect(url_or_connection) if url_or_connection

  @handlers     = {}
  @handler_pool = Concurrent::FixedThreadPool.new concurrency

  self.class.default_server ||= self
end

Class Attribute Details

.default_serverObject

Returns the value of attribute default_server.



20
21
22
# File 'lib/fluffle/server.rb', line 20

def default_server
  @default_server
end

Instance Attribute Details

#connectionObject (readonly)

Returns the value of attribute connection.



5
6
7
# File 'lib/fluffle/server.rb', line 5

def connection
  @connection
end

#handler_poolObject (readonly)

Returns the value of attribute handler_pool.



5
6
7
# File 'lib/fluffle/server.rb', line 5

def handler_pool
  @handler_pool
end

#handlersObject (readonly)

Returns the value of attribute handlers.



5
6
7
# File 'lib/fluffle/server.rb', line 5

def handlers
  @handlers
end

Instance Method Details

#call_handler(handler:, request:) ⇒ Object

handler - Instance of a Handler that may receive #call request - Hash representing a decoded Request



118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
# File 'lib/fluffle/server.rb', line 118

def call_handler(handler:, request:)
  begin
    # We don't yet know if it's valid, so we have to be as cautious as
    # possible about getting the ID
    id = begin request['id']; rescue; nil end

    self.validate_request request

    result = handler.call id: id,
                          method: request['method'],
                          params: request['params'],
                          meta: {}
  rescue => err
    log_error(err) if Fluffle.logger.error?

    error = self.build_error_response err
  end

  response = { 'jsonrpc' => '2.0', 'id' => id }

  if error
    response['error'] = error
  else
    response['result'] = result
  end

  response
end

#decode(payload) ⇒ Object

Deserialize a JSON payload and extract its 3 members: id, method, params

payload - String of the payload from the queue

Returns a Hash from parsing the JSON payload (keys should be String)



152
153
154
# File 'lib/fluffle/server.rb', line 152

def decode(payload)
  Oj.load payload
end

#drain(queue: 'default', handler: nil, &block) ⇒ Object



23
24
25
26
27
28
29
30
31
# File 'lib/fluffle/server.rb', line 23

def drain(queue: 'default', handler: nil, &block)
  if handler && block
    raise ArgumentError, 'Cannot provide both handler: and block'
  end

  handler = Fluffle::Handlers::Dispatcher.new(&block) if block

  @handlers[queue.to_s] = handler
end

#handle_request(handler:, properties:, payload:) ⇒ Object



79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
# File 'lib/fluffle/server.rb', line 79

def handle_request(handler:, properties:, payload:)
  reply_to = properties[:reply_to]

  responses = []

  begin
    decoded = self.decode payload

    requests =
      if decoded.is_a? Hash
        [ decoded ] # Single request
      elsif decoded.is_a? Array
        decoded # Batch request
      else
        raise Errors::InvalidRequestError.new('Payload was neither an Array nor an Object')
      end

    requests.each do |request|
      response = self.call_handler handler: handler,
                                   request: request

      responses << response
    end
  rescue => err
    responses << {
      'jsonrpc' => '2.0',
      'id'      => nil,
      'error'   => self.build_error_response(err)
    }
  end

  responses.each do |response|
    @exchange.publish Oj.dump(response), routing_key: reply_to,
                                         correlation_id: response['id']
  end
end

#startObject



33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
# File 'lib/fluffle/server.rb', line 33

def start
  @channel  = @connection.create_channel
  @exchange = @channel.default_exchange

  raise 'No handlers defined' if @handlers.empty?

  @handlers.each do |name, handler|
    qualified_name = Fluffle.request_queue_name name
    queue          = @channel.queue qualified_name

    queue.subscribe do |_delivery_info, properties, payload|
      @handler_pool.post do
        self.handle_request handler: handler,
                            properties: properties,
                            payload: payload
      end
    end
  end

  self.wait_for_signal
end

#validate_request(request) ⇒ Object

Raises if elements of the request payload do not comply with the spec

payload - Decoded Hash of the payload (String keys)

Raises:



159
160
161
162
# File 'lib/fluffle/server.rb', line 159

def validate_request(request)
  raise Errors::InvalidRequestError.new("Improperly formatted Request (expected `Hash', got `#{request.class}')") unless request && request.is_a?(Hash)
  raise Errors::InvalidRequestError.new("Missing `method' Request object member") unless request['method']
end

#wait_for_signalObject

NOTE: Keeping this in its own method so its functionality can be more

easily overwritten by `Fluffle::Testing`.


57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
# File 'lib/fluffle/server.rb', line 57

def wait_for_signal
  signal_read, signal_write = IO.pipe

  %w[INT TERM].each do |signal|
    Signal.trap(signal) do
      signal_write.puts signal
    end
  end

  # Adapted from Sidekiq:
  #   https://github.com/mperham/sidekiq/blob/e634177/lib/sidekiq/cli.rb#L94-L97
  while io = IO.select([signal_read])
    readables = io.first
    signal    = readables.first.gets.strip

    Fluffle.logger.info "Received #{signal}; shutting down..."
    @channel.work_pool.shutdown

    return
  end
end