Class: Twirl::Cluster

Inherits:
Object
  • Object
show all
Extended by:
Forwardable
Includes:
Enumerable
Defined in:
lib/twirl/cluster.rb

Constant Summary collapse

RetryableErrors =

Private: The default Array of errors to retry.

[
  KJess::NetworkError,
  # this inherits from protocol error, but seems like it should be retried
  KJess::ServerError,
]

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(clients, options = {}) ⇒ Cluster

Public: Initialize a new cluster.

clients - An array of KJess::Client instances with port (localhost:1234) options - A Hash of options.

:commands_per_client - The Number of commands to run per client
                       before rotating to the next client (default: 100)
:retries - The Number of times a command should be retried (default: 5).
:instrumenter - Where to send instrumention (defaults: noop).


50
51
52
53
54
55
56
57
58
# File 'lib/twirl/cluster.rb', line 50

def initialize(clients, options = {})
  @client_index = 0
  @command_count = 0
  @clients = clients.shuffle
  @retries = options.fetch(:retries, 5)
  @commands_per_client = options.fetch(:commands_per_client, 100)
  @instrumenter = options.fetch(:instrumenter, Instrumenters::Noop)
  @retryable_errors = options.fetch(:retryable_errors, RetryableErrors)
end

Instance Attribute Details

#client_indexObject (readonly)

Private: What is the array index of the client being used currently.



19
20
21
# File 'lib/twirl/cluster.rb', line 19

def client_index
  @client_index
end

#command_countObject (readonly)

Private: The number of commands issued to the current client.



22
23
24
# File 'lib/twirl/cluster.rb', line 22

def command_count
  @command_count
end

#commands_per_clientObject (readonly)

Private: The number of commands to issue to a client before rotating.



28
29
30
# File 'lib/twirl/cluster.rb', line 28

def commands_per_client
  @commands_per_client
end

#instrumenterObject (readonly)

Private: What should be used to instrument all the things.



31
32
33
# File 'lib/twirl/cluster.rb', line 31

def instrumenter
  @instrumenter
end

#retriesObject (readonly)

Private: The number of times to retry retryable errors.



25
26
27
# File 'lib/twirl/cluster.rb', line 25

def retries
  @retries
end

#retryable_errorsObject (readonly)

Private: What errors should be considered retryable.



34
35
36
# File 'lib/twirl/cluster.rb', line 34

def retryable_errors
  @retryable_errors
end

Instance Method Details

#clientObject

Private: Returns the client to be used to issue a command.



208
209
210
211
212
213
# File 'lib/twirl/cluster.rb', line 208

def client
  rotate if @command_count >= @commands_per_client

  @command_count += 1
  @clients[@client_index]
end

#client_read_op(client, op, queue_name, *args) ⇒ Object

Private: Perform an operation for a given client. Rotates clients if nil item is result of op.

Returns a Twirl::Item if an item was found, otherwise nil.



238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
# File 'lib/twirl/cluster.rb', line 238

def client_read_op(client, op, queue_name, *args)
  with_retries { |tries|
    @instrumenter.instrument("op.twirl") { |payload|
      payload[:op] = op
      payload[:queue_name] = queue_name
      payload[:retry] = tries != @retries

      if value = client.send(op, queue_name, *args)
        payload[:bytes] = value.size
        Item.new queue_name, value, client, @instrumenter
      else
        rotate_for_next_op
        nil
      end
    }
  }
end

#delete(queue_name) ⇒ Object

Public : Remove a queue.

queue_name - The String name of the queue.

Returns a Hash of hosts and results.



132
133
134
# File 'lib/twirl/cluster.rb', line 132

def delete(queue_name)
  multi_client_queue_op_with_result :delete, queue_name
end

#disconnectObject

Public: Disconnect from each client’s server.

Returns nothing.



196
197
198
# File 'lib/twirl/cluster.rb', line 196

def disconnect
  multi_client_op :disconnect
end

#each(&block) ⇒ Object

Public: Iterate through the clients.



61
62
63
# File 'lib/twirl/cluster.rb', line 61

def each(&block)
  @clients.each { |client| yield client }
end

#flush(queue_name) ⇒ Object

Public: Remove all items from a queue.

queue_name - The String name of the queue.

Returns a Hash of hosts and results.



141
142
143
# File 'lib/twirl/cluster.rb', line 141

def flush(queue_name)
  multi_client_queue_op_with_result :flush, queue_name
end

#flush_allObject

Public: Remove all items from all queues.

Returns a Hash of hosts and results.



148
149
150
# File 'lib/twirl/cluster.rb', line 148

def flush_all
  multi_client_op_with_result :flush_all
end

#get(queue_name, options = {}) ⇒ Object

Public: Retrieve an item from the given queue.

It is possible to send both :open and :close in the same get operation, but I would not recommend it. You will end up in a situation where the client will rotate and the :close then goes to the wrong client.

We could do two get operations if you pass both options, send the :close to the current client and send the :open as a second operation to the rotated client, but that seems sneaky.

queue_name - The String name of the queue. options - The Hash of options for retrieving an item.

See KJess::Client#get for all options.

Returns a Twirl::Item if an item was found, otherwise nil.



100
101
102
# File 'lib/twirl/cluster.rb', line 100

def get(queue_name, options = {})
  client_read_op client, :get, queue_name, options
end

#multi_client_op(op, *args, &block) ⇒ Object

Private: Perform an op on all the clients.



257
258
259
260
261
262
263
264
265
266
267
268
269
# File 'lib/twirl/cluster.rb', line 257

def multi_client_op(op, *args, &block)
  @instrumenter.instrument("op.twirl") { |payload|
    payload[:op] = op

    @clients.each do |client|
      if block_given?
        yield client
      else
        client.send(op, *args)
      end
    end
  }
end

#multi_client_op_with_result(op, *args, &block) ⇒ Object

Private: Perform an op on all clients.

Returns a Hash of the servers as keys and the results as values.



291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
# File 'lib/twirl/cluster.rb', line 291

def multi_client_op_with_result(op, *args, &block)
  @instrumenter.instrument("op.twirl") { |payload|
    payload[:op] = op

    result = {}
    @clients.each { |client|
      result["#{client.host}:#{client.port}"] = if block_given?
        yield client
      else
        client.send(op, *args)
      end
    }
    result
  }
end

#multi_client_queue_op_with_result(op, queue_name, *args, &block) ⇒ Object



271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
# File 'lib/twirl/cluster.rb', line 271

def multi_client_queue_op_with_result(op, queue_name, *args, &block)
  @instrumenter.instrument("op.twirl") { |payload|
    payload[:op] = op
    payload[:queue_name] = queue_name

    result = {}
    @clients.each { |client|
      result["#{client.host}:#{client.port}"] = if block_given?
        yield client
      else
        client.send(op, queue_name, *args)
      end
    }
    result
  }
end

#peek(queue_name) ⇒ Object

Public: Peek at the top item in the queue.

queue_name - The String name of the queue.

Returns a Twirl::Item if an item was found, otherwise nil.



123
124
125
# File 'lib/twirl/cluster.rb', line 123

def peek(queue_name)
  client_read_op client, :peek, queue_name
end

#pingObject

Public: Which clients can actually reach their server.

Returns Hash of hosts and results.



168
169
170
# File 'lib/twirl/cluster.rb', line 168

def ping
  multi_client_op_with_result :ping
end

#quitObject

Public: Disconnect from each client’s server.

Returns Hash of hosts and results.



189
190
191
# File 'lib/twirl/cluster.rb', line 189

def quit
  multi_client_op_with_result :quit
end

#reloadObject

Public: Reload the config of each client’s server.

Returns Hash of hosts and results.



175
176
177
# File 'lib/twirl/cluster.rb', line 175

def reload
  multi_client_op_with_result :reload
end

#reserve(queue_name, options = {}) ⇒ Object

Public: Reserve the next item on the queue.

This is a helper method to get an item from a queue and open it for reliable read.

queue_name - The String name of the queue. options - Additional options.

See KJess::Client#get for all options.

Returns a Twirl::Item if an item was found, otherwise nil.



114
115
116
# File 'lib/twirl/cluster.rb', line 114

def reserve(queue_name, options = {})
  client_read_op client, :reserve, queue_name, options
end

#rotateObject

Private: Ensures that clients will be rotated by changing the client index and resetting the command count.



217
218
219
220
221
222
223
224
225
226
227
# File 'lib/twirl/cluster.rb', line 217

def rotate
  @instrumenter.instrument "op.twirl", {
    op: :rotate,
    metric_type: :counter,
    command_count: @command_count,
    commands_per_client: @commands_per_client,
  }

  @command_count = 0
  @client_index = (@client_index + 1) % @clients.size
end

#rotate_for_next_opObject

Private: Makes it so the client will rotate for the next operation.



230
231
232
# File 'lib/twirl/cluster.rb', line 230

def rotate_for_next_op
  @command_count = @commands_per_client
end

#set(queue_name, item, expiration = 0) ⇒ Object

Public: Add an item to the given queue.

queue_name - The String name of the queue. item - The String item to add to the queue. expiration - The Number of seconds from now to expire the item (default: 0).

Returns true if successful, false otherwise.



72
73
74
75
76
77
78
79
80
81
82
83
# File 'lib/twirl/cluster.rb', line 72

def set(queue_name, item, expiration = 0)
  with_retries { |tries|
    @instrumenter.instrument("op.twirl") { |payload|
      payload[:op] = :set
      payload[:bytes] = item.size
      payload[:queue_name] = queue_name
      payload[:retry] = tries != @retries

      client.set(queue_name, item, expiration)
    }
  }
end

#shutdownObject

Public: Tells each client to shutdown their server.

Returns nothing.



203
204
205
# File 'lib/twirl/cluster.rb', line 203

def shutdown
  multi_client_op :shutdown
end

#statsObject

Public: Return stats for each client’s server.

Returns a Hash of stats for each host.



182
183
184
# File 'lib/twirl/cluster.rb', line 182

def stats
  multi_client_op_with_result :stats
end

#versionObject

Public: Return the version of each server.

Returns a Hash of hosts and results.



155
156
157
158
159
160
161
162
163
# File 'lib/twirl/cluster.rb', line 155

def version
  multi_client_op_with_result :version do |client|
    begin
      client.version
    rescue KJess::ProtocolError
      "unavailable"
    end
  end
end

#with_retriesObject



307
308
309
310
311
312
313
314
315
# File 'lib/twirl/cluster.rb', line 307

def with_retries
  tries = @retries
  begin
    yield tries
  rescue *@retryable_errors
    tries -= 1
    tries > 0 ? retry : raise
  end
end