Module: Politics::StaticQueueWorker
- Defined in:
- lib/politics/static_queue_worker.rb
Overview
The StaticQueueWorker mixin allows a processing daemon to “lease” or checkout a portion of a problem space to ensure no other process is processing that same space at the same time. The processing space is cut into N “buckets”, each of which is placed in a queue. Processes then fetch entries from the queue and process them. It is up to the application to map the bucket number onto its specific problem space.
Note that memcached is used for leader election. The leader owns the queue during the iteration period and other peers fetch buckets from the current leader during the iteration.
The leader hands out buckets in order. Once all the buckets have been processed, the leader returns nil to the processors which causes them to sleep until the end of the iteration. Then everyone wakes up, a new leader is elected, and the processing starts all over again.
DRb and mDNS are used for peer discovery and communication.
Example usage:
class Analyzer
include Politics::StaticQueueWorker
TOTAL_BUCKETS = 16
def start
register_worker(self.class.name, TOTAL_BUCKETS)
process_bucket do |bucket|
puts "Analyzing bucket #{bucket} of #{TOTAL_BUCKETS}"
sleep 5
end
end
end
Note: process_bucket never returns i.e. this should be the main loop of your processing daemon.
Class Method Summary collapse
-
.included(model) ⇒ Object
:nodoc:.
Instance Method Summary collapse
- #bucket_request(requestor_uri) ⇒ Object
- #next_bucket(requestor_uri) ⇒ Object
-
#process_bucket(&block) ⇒ Object
Fetch a bucket out of the queue and pass it to the given block to be processed.
-
#register_worker(name, bucket_count, config = {}) ⇒ Object
Register this process as able to work on buckets.
- #until_next_iteration ⇒ Object
Class Method Details
.included(model) ⇒ Object
:nodoc:
61 62 63 64 65 |
# File 'lib/politics/static_queue_worker.rb', line 61 def self.included(model) #:nodoc: model.class_eval do attr_accessor :group_name, :iteration_length, :uri end end |
Instance Method Details
#bucket_request(requestor_uri) ⇒ Object
133 134 135 136 137 138 139 140 141 |
# File 'lib/politics/static_queue_worker.rb', line 133 def bucket_request(requestor_uri) if leader? log.debug "delivering bucket request" next_bucket requestor_uri else log.debug "received request for bucket but am not leader - delivering :not_leader" [:not_leader, 0] end end |
#next_bucket(requestor_uri) ⇒ Object
143 144 145 |
# File 'lib/politics/static_queue_worker.rb', line 143 def next_bucket(requestor_uri) [@buckets.pop, until_next_iteration] end |
#process_bucket(&block) ⇒ Object
Fetch a bucket out of the queue and pass it to the given block to be processed.
bucket-
The bucket number to process, within the range 0…TOTAL_BUCKETS
93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 |
# File 'lib/politics/static_queue_worker.rb', line 93 def process_bucket(&block) log.debug "start bucket processing" raise ArgumentError, "process_bucket requires a block!" unless block_given? raise ArgumentError, "You must call register_worker before processing!" unless @memcache_client begin begin nominate if leader? # Drb thread handles leader duties log.info { "has been elected leader" } initialize_buckets # keeping leader state as long as buckets are available by renominating before # nomination times out while !@buckets.empty? do log.debug { "relaxes half the time until next iteration" } relax(until_next_iteration / 2) log.debug { "renew nomination too keep the hat and finish the work" } @memcache_client.set(token, @uri, iteration_length) @nominated_at = Time.now log.error { "tried to staying leader but failed" } unless leader? end relax until_next_iteration else # Get a bucket from the leader and process it begin log.debug "getting bucket request from leader (#{leader_uri}) and processing it" bucket_process(*leader.bucket_request(uri), &block) rescue DRb::DRbError => dre log.error { "Error talking to leader: #{dre.message}" } relax until_next_iteration end end rescue MemCache::MemCacheError => e log.error { "Unexpected MemCacheError: #{e.message}" } relax until_next_iteration end end while loop? end |
#register_worker(name, bucket_count, config = {}) ⇒ Object
Register this process as able to work on buckets.
68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 |
# File 'lib/politics/static_queue_worker.rb', line 68 def register_worker(name, bucket_count, config={}) = { :iteration_length => 60, :servers => ['127.0.0.1:11211'] } .merge!(config) self.group_name = name self.iteration_length = [:iteration_length] @nominated_at = Time.now - self.iteration_length @memcache_client = client_for(Array([:servers])) # FIXME: Tests @domain = [:domain] @buckets = [] @bucket_count = bucket_count register_with_bonjour log.progname = @uri log.info { "Registered in group #{group_name} at port #{@port}" } end |
#until_next_iteration ⇒ Object
147 148 149 150 |
# File 'lib/politics/static_queue_worker.rb', line 147 def until_next_iteration left = iteration_length - (Time.now - @nominated_at) left > 0 ? left : 0 end |