Module: Politics::StaticQueueWorker
- Defined in:
- lib/politics/static_queue_worker.rb
Instance Attribute Summary collapse
-
#buckets ⇒ Object
readonly
Returns the value of attribute buckets.
-
#dictatorship_length ⇒ Object
readonly
Returns the value of attribute dictatorship_length.
-
#group_name ⇒ Object
readonly
Returns the value of attribute group_name.
-
#iteration_length ⇒ Object
readonly
Returns the value of attribute iteration_length.
-
#uri ⇒ Object
readonly
Returns the value of attribute uri.
Instance Method Summary collapse
- #alive? ⇒ Boolean
- #as_dictator(&block) ⇒ Object
- #bucket_request(requestor_uri, context) ⇒ Object
- #find_workers ⇒ Object
- #followers_to_stop ⇒ Object
- #leader ⇒ Object
- #next_bucket(requestor_uri, context) ⇒ Object
- #perform_leader_duties ⇒ Object
- #populate_followers_to_stop ⇒ Object
-
#process_bucket(&block) ⇒ Object
Fetch a bucket out of the queue and pass it to the given block to be processed.
-
#register_worker(name, bucket_count, config = {}) ⇒ Object
Register this process as able to work on buckets.
- #restart_wanted? ⇒ Boolean
- #seize_leadership(*args) ⇒ Object
- #sleep_until_next_bucket_time ⇒ Object
- #until_next_iteration ⇒ Object
Instance Attribute Details
#buckets ⇒ Object (readonly)
Returns the value of attribute buckets.
12 13 14 |
# File 'lib/politics/static_queue_worker.rb', line 12 def buckets @buckets end |
#dictatorship_length ⇒ Object (readonly)
Returns the value of attribute dictatorship_length.
12 13 14 |
# File 'lib/politics/static_queue_worker.rb', line 12 def dictatorship_length @dictatorship_length end |
#group_name ⇒ Object (readonly)
Returns the value of attribute group_name.
12 13 14 |
# File 'lib/politics/static_queue_worker.rb', line 12 def group_name @group_name end |
#iteration_length ⇒ Object (readonly)
Returns the value of attribute iteration_length.
12 13 14 |
# File 'lib/politics/static_queue_worker.rb', line 12 def iteration_length @iteration_length end |
#uri ⇒ Object (readonly)
Returns the value of attribute uri.
12 13 14 |
# File 'lib/politics/static_queue_worker.rb', line 12 def uri @uri end |
Instance Method Details
#alive? ⇒ Boolean
157 158 159 |
# File 'lib/politics/static_queue_worker.rb', line 157 def alive? true end |
#as_dictator(&block) ⇒ Object
78 79 80 81 82 83 84 85 |
# File 'lib/politics/static_queue_worker.rb', line 78 def as_dictator(&block) duration = dictatorship_length || (iteration_length * 10) log.debug { "become dictator for up to #{duration} seconds" } seize_leadership duration yield raise "lost leadership while being dictator for #{duration} seconds" unless leader? seize_leadership end |
#bucket_request(requestor_uri, context) ⇒ Object
127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 |
# File 'lib/politics/static_queue_worker.rb', line 127 def bucket_request(requestor_uri, context) if leader? log.debug "delivering bucket request" bucket_spec = next_bucket(requestor_uri, context) if !bucket_spec[0] && @followers_to_stop.include?(requestor_uri) # the leader stops its own process and must not be killed by its worker if requestor_uri != uri bucket_spec = [:stop, 0] end @followers_to_stop.delete(requestor_uri) end bucket_spec else log.debug "received request for bucket but am not leader - delivering :not_leader" [:not_leader, 0] end end |
#find_workers ⇒ Object
171 172 173 |
# File 'lib/politics/static_queue_worker.rb', line 171 def find_workers raise "Please provide a method ”find_workers” returning a list of all other worker URIs" end |
#followers_to_stop ⇒ Object
123 124 125 |
# File 'lib/politics/static_queue_worker.rb', line 123 def followers_to_stop @followers_to_stop.select {|u| DRbObject.new(nil, u).alive? rescue DRb::DRbConnError && false} end |
#leader ⇒ Object
161 162 163 164 165 166 167 168 169 |
# File 'lib/politics/static_queue_worker.rb', line 161 def leader 2.times do break if leader_uri log.debug "could not determine leader - relaxing until next iteration" relax until_next_iteration end raise "cannot determine leader" unless leader_uri DRbObject.new(nil, leader_uri) end |
#next_bucket(requestor_uri, context) ⇒ Object
145 146 147 |
# File 'lib/politics/static_queue_worker.rb', line 145 def next_bucket(requestor_uri, context) [@buckets.pop, sleep_until_next_bucket_time] end |
#perform_leader_duties ⇒ Object
91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 |
# File 'lib/politics/static_queue_worker.rb', line 91 def perform_leader_duties # The DRb thread handles the requests to the leader. # This method performs the bucket managing. log.info { "has been elected leader" } before_perform_leader_duties # keeping leader state as long as buckets are being initialized as_dictator { initialize_buckets } while !buckets.empty? # keeping leader state as long as buckets are available by renominating before # nomination times out as_dictator { update_buckets } unless restart_wanted? end if restart_wanted? log.info "restart triggered" as_dictator { populate_followers_to_stop } # keeping leader state as long as there are followers to stop while !followers_to_stop.empty? log.info "waiting fo workers to stop: #{followers_to_stop}" relax(until_next_iteration / 2) seize_leadership end log.info "leader exiting due to trigger" exit 0 end end |
#populate_followers_to_stop ⇒ Object
119 120 121 |
# File 'lib/politics/static_queue_worker.rb', line 119 def populate_followers_to_stop @followers_to_stop.replace(find_workers) end |
#process_bucket(&block) ⇒ Object
Fetch a bucket out of the queue and pass it to the given block to be processed.
39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 |
# File 'lib/politics/static_queue_worker.rb', line 39 def process_bucket(&block) log.debug "start bucket processing" raise ArgumentError, "process_bucket requires a block!" unless block_given? unless memcache_client raise ArgumentError, "You must call register_worker before processing!" end begin begin raise "self is not alive via drb" unless DRbObject.new(nil, uri).alive? rescue Exception => e raise "cannot reach self via drb: #{e.message}" end begin nominate if leader? && !(@leader_thread && @leader_thread.alive?) unless (@leader_thread && @leader_thread.alive?) @leader_thread = Thread.new do perform_leader_duties end end end # Get a bucket from the leader and process it begin log.debug "getting bucket request from leader (#{leader_uri}) and processing it" bucket_process(*leader.bucket_request(uri, bucket_request_context), &block) rescue DRb::DRbError => dre log.error { "Error talking to leader: #{dre.message}" } relax until_next_iteration end rescue Dalli::DalliError => e log.error { "Unexpected DalliError: #{e.message}" } relax until_next_iteration end end while loop? end |
#register_worker(name, bucket_count, config = {}) ⇒ Object
Register this process as able to work on buckets.
15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 |
# File 'lib/politics/static_queue_worker.rb', line 15 def register_worker(name, bucket_count, config = {}) = config.dup [:iteration_length] ||= 10 [:servers] ||= ['127.0.0.1:11211'] @group_name = name @iteration_length = [:iteration_length] @memcache_client = client_for(Array([:servers])) @dictatorship_length = [:dictatorship_length] @buckets = [] @bucket_count = bucket_count @followers_to_stop = Set.new start_druby_service log.progname = uri log.info { "Registered in group #{group_name} at #{uri}" } at_exit do internal_cleanup cleanup end end |
#restart_wanted? ⇒ Boolean
175 176 177 |
# File 'lib/politics/static_queue_worker.rb', line 175 def restart_wanted? memcache_client.get(restart_flag) end |
#seize_leadership(*args) ⇒ Object
87 88 89 |
# File 'lib/politics/static_queue_worker.rb', line 87 def seize_leadership(*args) start_iteration(*args) {|duration| memcache_client.set(token, uri, duration) } end |
#sleep_until_next_bucket_time ⇒ Object
149 150 151 |
# File 'lib/politics/static_queue_worker.rb', line 149 def sleep_until_next_bucket_time [[until_next_iteration / 2, 1].max, iteration_length / 2].min end |
#until_next_iteration ⇒ Object
153 154 155 |
# File 'lib/politics/static_queue_worker.rb', line 153 def until_next_iteration [(iteration_end || Time.at(0)) - Time.now, 0].max end |