Module: Politics::TokenWorker
- Defined in:
- lib/politics/token_worker.rb
Overview
An algorithm to provide leader election between a set of identical processing daemons.
Each TokenWorker is an instance which needs to perform some processing. The worker instance must obtain the leader token before performing some task. We use a memcached server as a central token authority to provide a shared, network-wide view for all processors. This reliance on a single resource means if your memcached server goes down, so do the processors. Oftentimes, this is an acceptable trade-off since many high-traffic web sites would not be useable without memcached running anyhow.
Essentially each TokenWorker attempts to elect itself every :iteration_length seconds by simply setting a key in memcached to its own name. Memcached tracks which name got there first. The key expires after :iteration_length seconds.
Example usage:
class Analyzer
include Politics::TokenWorker
def initialize
register_worker 'analyzer', :iteration_length => 120, :servers => ['localhost:11211']
end
def start
process do
# do analysis here, will only be done when this process
# is actually elected leader, otherwise it will sleep for
# iteration_length seconds.
end
end
end
Notes:
-
This will not work with multiple instances in the same Ruby process. The library is only designed to elect a leader from a set of processes, not instances within a single process.
-
The algorithm makes no attempt to keep the same leader during the next iteration. This can often times be quite beneficial (e.g. leveraging a warm cache from the last iteration) for performance but is left to the reader to implement.
Class Method Summary collapse
-
.included(model) ⇒ Object
:nodoc:.
Instance Method Summary collapse
- #process(*args, &block) ⇒ Object
-
#register_worker(name, config = {}) ⇒ Object
Register this instance as a worker.
Class Method Details
.included(model) ⇒ Object
:nodoc:
50 51 52 53 54 55 56 57 |
# File 'lib/politics/token_worker.rb', line 50 def self.included(model) #:nodoc: model.class_eval do attr_accessor :memcache_client, :token, :iteration_length, :worker_name class << self attr_accessor :worker_instance #:nodoc: end end end |
Instance Method Details
#process(*args, &block) ⇒ Object
81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 |
# File 'lib/politics/token_worker.rb', line 81 def process(*args, &block) verify_registration begin # Try to add our name as the worker with the master token. # If another process got there first, this is a noop. # We add an expiry so that the master token will constantly # need to be refreshed (in case the current leader dies). time = 0 begin nominate if leader? Politics::log.info { "#{worker_name} elected leader at #{Time.now}" } # If we are the master worker, do the work. time = time_for do result = block.call(*args) end end rescue MemCache::MemCacheError => me Politics::log.error("Error from memcached, pausing until the next iteration...") Politics::log.error(me.) Politics::log.error(me.backtrace.join("\n")) self.memcache_client.reset end pause_until_expiry(time) reset_state end while loop? end |
#register_worker(name, config = {}) ⇒ Object
Register this instance as a worker.
Options:
:iteration_length-
The length of a processing iteration, in seconds. The leader’s ‘reign’ lasts for this length of time.
:servers-
An array of memcached server strings
65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 |
# File 'lib/politics/token_worker.rb', line 65 def register_worker(name, config={}) # track the latest instance of this class, there's really only supposed to be # a single TokenWorker instance per process. self.class.worker_instance = self = { :iteration_length => 60, :servers => ['localhost:11211'] } .merge!(config) self.token = "#{name}_token" self.memcache_client = client_for(Array([:servers])) self.iteration_length = [:iteration_length] self.worker_name = "#{Socket.gethostname}:#{$$}" cleanup end |