Module: Politics::StaticQueueWorker

Defined in:
lib/politics/static_queue_worker.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Instance Attribute Details

#bucketsObject (readonly)

Returns the value of attribute buckets.



12
13
14
# File 'lib/politics/static_queue_worker.rb', line 12

def buckets
  @buckets
end

#dictatorship_lengthObject (readonly)

Returns the value of attribute dictatorship_length.



12
13
14
# File 'lib/politics/static_queue_worker.rb', line 12

def dictatorship_length
  @dictatorship_length
end

#group_nameObject (readonly)

Returns the value of attribute group_name.



12
13
14
# File 'lib/politics/static_queue_worker.rb', line 12

def group_name
  @group_name
end

#iteration_lengthObject (readonly)

Returns the value of attribute iteration_length.



12
13
14
# File 'lib/politics/static_queue_worker.rb', line 12

def iteration_length
  @iteration_length
end

#uriObject (readonly)

Returns the value of attribute uri.



12
13
14
# File 'lib/politics/static_queue_worker.rb', line 12

def uri
  @uri
end

Instance Method Details

#alive?Boolean

Returns:

  • (Boolean)


157
158
159
# File 'lib/politics/static_queue_worker.rb', line 157

def alive?
  true
end

#as_dictator(&block) ⇒ Object



78
79
80
81
82
83
84
85
# File 'lib/politics/static_queue_worker.rb', line 78

def as_dictator(&block)
  duration = dictatorship_length || (iteration_length * 10)
  log.debug { "become dictator for up to #{duration} seconds" }
  seize_leadership duration
  yield
  raise "lost leadership while being dictator for #{duration} seconds" unless leader?
  seize_leadership
end

#bucket_request(requestor_uri, context) ⇒ Object



127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
# File 'lib/politics/static_queue_worker.rb', line 127

def bucket_request(requestor_uri, context)
  if leader?
    log.debug "delivering bucket request"
    bucket_spec = next_bucket(requestor_uri, context)
    if !bucket_spec[0] && @followers_to_stop.include?(requestor_uri)
      # the leader stops its own process and must not be killed by its worker
      if requestor_uri != uri
        bucket_spec = [:stop, 0]
      end
      @followers_to_stop.delete(requestor_uri)
    end
    bucket_spec
  else
    log.debug "received request for bucket but am not leader - delivering :not_leader"
    [:not_leader, 0]
  end
end

#find_workersObject



171
172
173
# File 'lib/politics/static_queue_worker.rb', line 171

def find_workers
  raise "Please provide a method ”find_workers” returning a list of all other worker URIs"
end

#followers_to_stopObject



123
124
125
# File 'lib/politics/static_queue_worker.rb', line 123

def followers_to_stop
  @followers_to_stop.select {|u| DRbObject.new(nil, u).alive? rescue DRb::DRbConnError && false}
end

#leaderObject



161
162
163
164
165
166
167
168
169
# File 'lib/politics/static_queue_worker.rb', line 161

def leader
  2.times do
    break if leader_uri
    log.debug "could not determine leader - relaxing until next iteration"
    relax until_next_iteration
  end
  raise "cannot determine leader" unless leader_uri
  DRbObject.new(nil, leader_uri)
end

#next_bucket(requestor_uri, context) ⇒ Object



145
146
147
# File 'lib/politics/static_queue_worker.rb', line 145

def next_bucket(requestor_uri, context)
  [@buckets.pop, sleep_until_next_bucket_time]
end

#perform_leader_dutiesObject



91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
# File 'lib/politics/static_queue_worker.rb', line 91

def perform_leader_duties
  # The DRb thread handles the requests to the leader.
  # This method performs the bucket managing.
  log.info { "has been elected leader" }
  before_perform_leader_duties
  # keeping leader state as long as buckets are being initialized
  as_dictator { initialize_buckets }

  while !buckets.empty?
    # keeping leader state as long as buckets are available by renominating before
    # nomination times out
    as_dictator { update_buckets } unless restart_wanted?
  end

  if restart_wanted?
    log.info "restart triggered"
    as_dictator { populate_followers_to_stop }
    # keeping leader state as long as there are followers to stop
    while !followers_to_stop.empty?
      log.info "waiting fo workers to stop: #{followers_to_stop}"
      relax(until_next_iteration / 2)
      seize_leadership
    end
    log.info "leader exiting due to trigger"
    exit 0
  end
end

#populate_followers_to_stopObject



119
120
121
# File 'lib/politics/static_queue_worker.rb', line 119

def populate_followers_to_stop
  @followers_to_stop.replace(find_workers)
end

#process_bucket(&block) ⇒ Object

Fetch a bucket out of the queue and pass it to the given block to be processed.

Raises:

  • (ArgumentError)


39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
# File 'lib/politics/static_queue_worker.rb', line 39

def process_bucket(&block)
  log.debug "start bucket processing"
  raise ArgumentError, "process_bucket requires a block!" unless block_given?
  unless memcache_client
    raise ArgumentError, "You must call register_worker before processing!"
  end

  begin
    begin
      raise "self is not alive via drb" unless DRbObject.new(nil, uri).alive?
    rescue Exception => e
      raise "cannot reach self via drb: #{e.message}"
    end
    begin
      nominate

      if leader? && !(@leader_thread && @leader_thread.alive?)
        unless (@leader_thread && @leader_thread.alive?)
          @leader_thread = Thread.new do
            perform_leader_duties
          end
        end
      end

      # Get a bucket from the leader and process it
      begin
        log.debug "getting bucket request from leader (#{leader_uri}) and processing it"
        bucket_process(*leader.bucket_request(uri, bucket_request_context), &block)
      rescue DRb::DRbError => dre
        log.error { "Error talking to leader: #{dre.message}" }
        relax until_next_iteration
      end
    rescue Dalli::DalliError => e
      log.error { "Unexpected DalliError: #{e.message}" }
      relax until_next_iteration
    end
  end while loop?
end

#register_worker(name, bucket_count, config = {}) ⇒ Object

Register this process as able to work on buckets.



15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
# File 'lib/politics/static_queue_worker.rb', line 15

def register_worker(name, bucket_count, config = {})
  options = config.dup
  options[:iteration_length] ||= 10
  options[:servers] ||= ['127.0.0.1:11211']

  @group_name = name
  @iteration_length = options[:iteration_length]
  @memcache_client = client_for(Array(options[:servers]))
  @dictatorship_length = options[:dictatorship_length]

  @buckets = []
  @bucket_count = bucket_count
  @followers_to_stop = Set.new

  start_druby_service
  log.progname = uri
  log.info { "Registered in group #{group_name} at #{uri}" }
  at_exit do
    internal_cleanup
    cleanup
  end
end

#restart_wanted?Boolean

Returns:

  • (Boolean)


175
176
177
# File 'lib/politics/static_queue_worker.rb', line 175

def restart_wanted?
  memcache_client.get(restart_flag)
end

#seize_leadership(*args) ⇒ Object



87
88
89
# File 'lib/politics/static_queue_worker.rb', line 87

def seize_leadership(*args)
  start_iteration(*args) {|duration| memcache_client.set(token, uri, duration) }
end

#sleep_until_next_bucket_timeObject



149
150
151
# File 'lib/politics/static_queue_worker.rb', line 149

def sleep_until_next_bucket_time
  [[until_next_iteration / 2, 1].max, iteration_length / 2].min
end

#until_next_iterationObject



153
154
155
# File 'lib/politics/static_queue_worker.rb', line 153

def until_next_iteration
  [(iteration_end || Time.at(0)) - Time.now, 0].max
end