Class: Qu::Worker

Inherits:
Object
  • Object
show all
Includes:
Logger
Defined in:
lib/qu/worker.rb

Defined Under Namespace

Classes: Abort

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from Logger

#clean_backtrace, #log_exception, #logger

Constructor Details

#initialize(*queues) ⇒ Worker

Returns a new instance of Worker.



10
11
12
13
14
# File 'lib/qu/worker.rb', line 10

def initialize(*queues)
  @queues = queues.flatten
  self.attributes = @queues.pop if @queues.last.is_a?(Hash)
  @queues << 'default' if @queues.empty?
end

Instance Attribute Details

#queuesObject

Returns the value of attribute queues.



5
6
7
# File 'lib/qu/worker.rb', line 5

def queues
  @queues
end

Instance Method Details

#attributesObject



22
23
24
# File 'lib/qu/worker.rb', line 22

def attributes
  {'hostname' => hostname, 'pid' => pid, 'queues' => queues}
end

#attributes=(attrs) ⇒ Object



16
17
18
19
20
# File 'lib/qu/worker.rb', line 16

def attributes=(attrs)
  attrs.each do |attr, value|
    self.instance_variable_set("@#{attr}", value)
  end
end

#handle_signalsObject



26
27
28
29
30
31
32
33
34
# File 'lib/qu/worker.rb', line 26

def handle_signals
  logger.debug "Worker #{id} registering traps for INT and TERM signals"
  %W(INT TERM).each do |sig|
    trap(sig) do
      logger.info "Worker #{id} received #{sig}, shutting down"
      raise Abort
    end
  end
end

#hostnameObject



73
74
75
# File 'lib/qu/worker.rb', line 73

def hostname
  @hostname ||= `hostname`.strip
end

#idObject



65
66
67
# File 'lib/qu/worker.rb', line 65

def id
  @id ||= "#{hostname}:#{pid}:#{queues.join(',')}"
end

#pidObject



69
70
71
# File 'lib/qu/worker.rb', line 69

def pid
  @pid ||= Process.pid
end

#startObject



53
54
55
56
57
58
59
60
61
62
63
# File 'lib/qu/worker.rb', line 53

def start
  logger.warn "Worker #{id} starting"
  handle_signals
  Qu.backend.register_worker(self)
  loop { work }
rescue Abort => e
  # Ok, we'll shut down, but give us a sec
ensure
  Qu.backend.unregister_worker(self)
  logger.debug "Worker #{id} done"
end

#workObject



45
46
47
48
49
50
51
# File 'lib/qu/worker.rb', line 45

def work
  logger.debug "Worker #{id} waiting for next job"
  job = Qu.reserve(self)
  logger.debug "Worker #{id} reserved job #{job}"
  job.perform
  logger.debug "Worker #{id} completed job #{job}"
end

#work_offObject



36
37
38
39
40
41
42
43
# File 'lib/qu/worker.rb', line 36

def work_off
  logger.debug "Worker #{id} working of all jobs"
  while job = Qu.reserve(self, :block => false)
    logger.debug "Worker #{id} reserved job #{job}"
    job.perform
    logger.debug "Worker #{id} completed job #{job}"
  end
end