Class: Kestrel::Client::Transactional

Inherits:
Proxy
  • Object
show all
Defined in:
lib/kestrel/client/transactional.rb

Defined Under Namespace

Classes: MultipleQueueException, RetryableJob

Constant Summary collapse

DEFAULT_RETRIES =

Number of times to retry a job before giving up

100
ERROR_PROCESSING_RATE =

Pct. of the time during ‘normal’ processing we check the error queue first

0.1
MAX_PER_SERVER =

Maximum number of gets to execute before switching servers

100_000

Instance Attribute Summary collapse

Attributes inherited from Proxy

#client

Instance Method Summary collapse

Methods inherited from Proxy

#method_missing

Constructor Details

#initialize(client, max_retries = nil, error_rate = nil, per_server = nil) ⇒ Transactional

Parameters

client<Kestrel::Client>

Client

max_retries<Integer>

Number of times to retry a job before giving up. Defaults to DEFAULT_RETRIES

error_rate<Float>

Pct. of the time during ‘normal’ processing we check the error queue first. Defaults to ERROR_PROCESSING_RATE

per_server<Integer>

Number of gets to execute against a single server, before changing servers. Defaults to MAX_PER_SERVER



34
35
36
37
38
39
40
# File 'lib/kestrel/client/transactional.rb', line 34

def initialize(client, max_retries = nil, error_rate = nil, per_server = nil)
  @max_retries = max_retries || DEFAULT_RETRIES
  @error_rate  = error_rate || ERROR_PROCESSING_RATE
  @per_server  = per_server || MAX_PER_SERVER
  @counter     = 0 # Command counter
  super(client)
end

Dynamic Method Handling

This class handles dynamic methods through the method_missing method in the class Kestrel::Client::Proxy

Instance Attribute Details

#current_queueObject (readonly)

Returns the value of attribute current_queue.



42
43
44
# File 'lib/kestrel/client/transactional.rb', line 42

def current_queue
  @current_queue
end

Instance Method Details

#current_tryObject



69
70
71
# File 'lib/kestrel/client/transactional.rb', line 69

def current_try
  @job ? @job.retries + 1 : 1
end

#get(key, opts = {}) ⇒ Object

Returns job from the key queue 1 - ERROR_PROCESSING_RATE pct. of the time. Every so often, checks the error queue for jobs and returns a retryable job. If either the error queue or key queue are empty, attempts to pull a job from the alternate queue before giving up.

Returns

Job, possibly retryable, or nil



53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
# File 'lib/kestrel/client/transactional.rb', line 53

def get(key, opts = {})
  raise MultipleQueueException if current_queue && key != current_queue

  close_transaction(current_try == 1 ? key : "#{key}_errors")

  q1, q2 = (rand < @error_rate) ? [key + "_errors", key] : [key, key + "_errors"]

  if job = get_with_fallback(q1, q2, opts.merge(:close => true, :open => true))
    @current_queue = key
    @job = job.is_a?(RetryableJob) ? job : RetryableJob.new(0, job)
    @job.job
  else
    @current_queue = @job = nil
  end
end

#retryObject

Enqueues the current job on the error queue for later retry. If the job has been retried DEFAULT_RETRIES times, gives up entirely.

Returns

Boolean

true if the job is retryable, false otherwise



80
81
82
83
84
85
86
87
88
89
90
91
92
93
# File 'lib/kestrel/client/transactional.rb', line 80

def retry
  return unless @job

  @job.retries += 1

  if should_retry = @job.retries < @max_retries
    client.set(current_queue + "_errors", @job)
  end

  # close the transaction on the original queue if this is the first retry
  close_transaction(@job.retries == 1 ? current_queue : "#{current_queue}_errors")

  should_retry
end