Module: Politics::StaticQueueWorker

Defined in:
lib/politics/static_queue_worker.rb

Overview

The StaticQueueWorker mixin allows a processing daemon to “lease” or checkout a portion of a problem space to ensure no other process is processing that same space at the same time. The processing space is cut into N “buckets”, each of which is placed in a queue. Processes then fetch entries from the queue and process them. It is up to the application to map the bucket number onto its specific problem space.

Note that memcached is used for leader election. The leader owns the queue during the iteration period and other peers fetch buckets from the current leader during the iteration.

The leader hands out buckets in order. Once all the buckets have been processed, the leader returns nil to the processors which causes them to sleep until the end of the iteration. Then everyone wakes up, a new leader is elected, and the processing starts all over again.

DRb and mDNS are used for peer discovery and communication.

Example usage:

class Analyzer
  include Politics::StaticQueueWorker
  TOTAL_BUCKETS = 16

  def start
    register_worker(self.class.name, TOTAL_BUCKETS)
    process_bucket do |bucket|
      puts "Analyzing bucket #{bucket} of #{TOTAL_BUCKETS}"
      sleep 5
    end
  end
end

Note: process_bucket never returns i.e. this should be the main loop of your processing daemon.

Class Method Summary collapse

Instance Method Summary collapse

Class Method Details

.included(model) ⇒ Object

:nodoc:



61
62
63
64
65
# File 'lib/politics/static_queue_worker.rb', line 61

def self.included(model) #:nodoc:
  model.class_eval do
    attr_accessor :group_name, :iteration_length, :uri
  end
end

Instance Method Details

#bucket_request(requestor_uri) ⇒ Object



133
134
135
136
137
138
139
140
141
# File 'lib/politics/static_queue_worker.rb', line 133

def bucket_request(requestor_uri)
  if leader?
    log.debug "delivering bucket request"
    next_bucket requestor_uri
  else
    log.debug "received request for bucket but am not leader - delivering :not_leader"
    [:not_leader, 0]
  end
end

#next_bucket(requestor_uri) ⇒ Object



143
144
145
# File 'lib/politics/static_queue_worker.rb', line 143

def next_bucket(requestor_uri)
  [@buckets.pop, until_next_iteration]
end

#process_bucket(&block) ⇒ Object

Fetch a bucket out of the queue and pass it to the given block to be processed.

bucket

The bucket number to process, within the range 0…TOTAL_BUCKETS

Raises:

  • (ArgumentError)


93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
# File 'lib/politics/static_queue_worker.rb', line 93

def process_bucket(&block)
  log.debug "start bucket processing"
  raise ArgumentError, "process_bucket requires a block!" unless block_given?
  raise ArgumentError, "You must call register_worker before processing!" unless @memcache_client

  begin
    begin
      nominate
      if leader?
        # Drb thread handles leader duties
        log.info { "has been elected leader" }
        initialize_buckets
        # keeping leader state as long as buckets are available by renominating before
        # nomination times out
        while !@buckets.empty? do
          log.debug { "relaxes half the time until next iteration" }
          relax(until_next_iteration / 2)
          log.debug { "renew nomination too keep the hat and finish the work" }
          @memcache_client.set(token, @uri, iteration_length)
          @nominated_at = Time.now
          log.error { "tried to staying leader but failed" } unless leader?
        end
        relax until_next_iteration
      else
        # Get a bucket from the leader and process it
        begin
          log.debug "getting bucket request from leader (#{leader_uri}) and processing it"
          bucket_process(*leader.bucket_request(uri), &block)
        rescue DRb::DRbError => dre
          log.error { "Error talking to leader: #{dre.message}" }
          relax until_next_iteration
        end
      end
    rescue MemCache::MemCacheError => e
      log.error { "Unexpected MemCacheError: #{e.message}" }
      relax until_next_iteration
    end
  end while loop?
end

#register_worker(name, bucket_count, config = {}) ⇒ Object

Register this process as able to work on buckets.



68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
# File 'lib/politics/static_queue_worker.rb', line 68

def register_worker(name, bucket_count, config={})
  options = {
        :iteration_length => 60,
        :servers => ['127.0.0.1:11211']
      }
  options.merge!(config)

  self.group_name = name
  self.iteration_length = options[:iteration_length]
  @nominated_at = Time.now - self.iteration_length
  @memcache_client = client_for(Array(options[:servers]))
  # FIXME: Tests
  @domain = options[:domain]

  @buckets = []
  @bucket_count = bucket_count

  register_with_bonjour
  log.progname = @uri
  log.info { "Registered in group #{group_name} at port #{@port}" }
end

#until_next_iterationObject



147
148
149
150
# File 'lib/politics/static_queue_worker.rb', line 147

def until_next_iteration
  left = iteration_length - (Time.now - @nominated_at)
  left > 0 ? left : 0
end