Module: Politics::TokenWorker

Defined in:
lib/politics/token_worker.rb

Overview

An algorithm to provide leader election between a set of identical processing daemons.

Each TokenWorker is an instance which needs to perform some processing. The worker instance must obtain the leader token before performing some task. We use a memcached server as a central token authority to provide a shared, network-wide view for all processors. This reliance on a single resource means if your memcached server goes down, so do the processors. Oftentimes, this is an acceptable trade-off since many high-traffic web sites would not be useable without memcached running anyhow.

Essentially each TokenWorker attempts to elect itself every :iteration_length seconds by simply setting a key in memcached to its own name. Memcached tracks which name got there first. The key expires after :iteration_length seconds.

Example usage:

class Analyzer
  include Politics::TokenWorker

  def initialize
    register_worker 'analyzer', :iteration_length => 120, :servers => ['localhost:11211']
  end

  def start
    process do
      # do analysis here, will only be done when this process
      # is actually elected leader, otherwise it will sleep for
      # iteration_length seconds.
    end
  end
end

Notes:

  • This will not work with multiple instances in the same Ruby process. The library is only designed to elect a leader from a set of processes, not instances within a single process.

  • The algorithm makes no attempt to keep the same leader during the next iteration. This can often times be quite beneficial (e.g. leveraging a warm cache from the last iteration) for performance but is left to the reader to implement.

Class Method Summary collapse

Instance Method Summary collapse

Class Method Details

.included(model) ⇒ Object

:nodoc:



50
51
52
53
54
55
56
57
# File 'lib/politics/token_worker.rb', line 50

def self.included(model) #:nodoc:
  model.class_eval do
    attr_accessor :memcache_client, :token, :iteration_length, :worker_name
    class << self
      attr_accessor :worker_instance #:nodoc:
    end
  end
end

Instance Method Details

#process(*args, &block) ⇒ Object



81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
# File 'lib/politics/token_worker.rb', line 81

def process(*args, &block)
  verify_registration

  begin
    # Try to add our name as the worker with the master token.
    # If another process got there first, this is a noop.
    # We add an expiry so that the master token will constantly
    # need to be refreshed (in case the current leader dies).
    time = 0
    begin
      nominate

      if leader?
        Politics::log.info { "#{worker_name} elected leader at #{Time.now}" }
        # If we are the master worker, do the work.
        time = time_for do
          result = block.call(*args)
        end
      end
    rescue MemCache::MemCacheError => me
      Politics::log.error("Error from memcached, pausing until the next iteration...")
      Politics::log.error(me.message)
      Politics::log.error(me.backtrace.join("\n"))
      self.memcache_client.reset
    end
    
    pause_until_expiry(time)
    reset_state
  end while loop?
end

#register_worker(name, config = {}) ⇒ Object

Register this instance as a worker.

Options:

:iteration_length

The length of a processing iteration, in seconds. The leader’s ‘reign’ lasts for this length of time.

:servers

An array of memcached server strings



65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
# File 'lib/politics/token_worker.rb', line 65

def register_worker(name, config={})
  # track the latest instance of this class, there's really only supposed to be
  # a single TokenWorker instance per process.
  self.class.worker_instance = self

  options = { :iteration_length => 60, :servers => ['localhost:11211'] }
  options.merge!(config)

  self.token = "#{name}_token"
  self.memcache_client = client_for(Array(options[:servers]))
  self.iteration_length = options[:iteration_length]
  self.worker_name = "#{Socket.gethostname}:#{$$}"

  cleanup
end