Module: QueueKit::Worker

Includes:
Instrumentable
Defined in:
lib/queue_kit/worker.rb

Instance Method Summary collapse

Methods included from Instrumentable

#debug, #default_instrumenter, #enable_debug_mode, #force_debug, #instrument, #instrumenter, #instrumenter_from

Instance Method Details

#coolObject



19
20
# File 'lib/queue_kit/worker.rb', line 19

def cool
end

#default_instrument_optionsObject



92
93
94
# File 'lib/queue_kit/worker.rb', line 92

def default_instrument_options
  {:worker => self}
end

#handle_error(err) ⇒ Object



22
23
24
# File 'lib/queue_kit/worker.rb', line 22

def handle_error(err)
  raise err
end

#initialize(queue, options = {}) ⇒ Object



5
6
7
8
9
10
11
12
13
# File 'lib/queue_kit/worker.rb', line 5

def initialize(queue, options = {})
  @queue = queue
  @processor = options.fetch(:processor) { method(:process) }
  @cooler = options.fetch(:cooler) { method(:cool) }
  @error_handler = options.fetch(:error_handler) { method(:handle_error) }
  @stopped = true

  instrumenter_from(options)
end

#nameObject



66
67
68
# File 'lib/queue_kit/worker.rb', line 66

def name
  @name ||= "#{self.class} #{Socket.gethostname}:#{Process.pid}"
end

#process(item) ⇒ Object

Raises:

  • (NotImplementedError)


15
16
17
# File 'lib/queue_kit/worker.rb', line 15

def process(item)
  raise NotImplementedError, "This worker can't do anything with #{item.inspect}"
end

#procline(string) ⇒ Object



41
42
43
44
# File 'lib/queue_kit/worker.rb', line 41

def procline(string)
  $0 = "QueueKit-#{QueueKit::VERSION}: #{string}"
  debug { ["worker.procline", {:message => string}] }
end

#runObject



30
31
32
33
34
35
36
37
38
39
# File 'lib/queue_kit/worker.rb', line 30

def run
  start
  interval_debugger = lambda { "worker.interval" }

  loop do
    work
    break unless working?
    debug(&interval_debugger)
  end
end

#set_popping_proclineObject



87
88
89
90
# File 'lib/queue_kit/worker.rb', line 87

def set_popping_procline
  @last_job_at = Time.now
  procline("Waiting since #{@last_job_at.to_i}")
end

#set_working_proclineObject



83
84
85
# File 'lib/queue_kit/worker.rb', line 83

def set_working_procline
  procline("Processing since #{Time.now.to_i}")
end

#startObject



70
71
72
73
# File 'lib/queue_kit/worker.rb', line 70

def start
  set_popping_procline
  @stopped = false
end

#stopObject



75
76
77
# File 'lib/queue_kit/worker.rb', line 75

def stop
  @stopped = true
end

#trap_signals(signal_handler) ⇒ Object



26
27
28
# File 'lib/queue_kit/worker.rb', line 26

def trap_signals(signal_handler)
  SignalChecker.trap(self, signal_handler)
end

#workObject



46
47
48
# File 'lib/queue_kit/worker.rb', line 46

def work
  wrap_error { work! }
end

#work!Object



50
51
52
53
54
55
56
57
58
# File 'lib/queue_kit/worker.rb', line 50

def work!
  if item = @queue.pop
    set_working_procline
    @processor.call(item)
    set_popping_procline
  else
    @cooler.call if working?
  end
end

#working?Boolean

Returns:

  • (Boolean)


79
80
81
# File 'lib/queue_kit/worker.rb', line 79

def working?
  !@stopped
end

#wrap_errorObject



60
61
62
63
64
# File 'lib/queue_kit/worker.rb', line 60

def wrap_error
  yield
rescue Exception => exception
  @error_handler.call(exception)
end