Class: QC::Worker

Inherits:
Object
  • Object
show all
Defined in:
lib/queue_classic/worker.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(*args) ⇒ Worker

Returns a new instance of Worker.



6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
# File 'lib/queue_classic/worker.rb', line 6

def initialize(*args)
  if args.length == 5
    q_name, top_bound, fork_worker, listening_worker, max_attempts = *args
  elsif args.length <= 1
    opts = args.first || {}
    q_name           = opts[:q_name]           || QC::QUEUE
    top_bound        = opts[:top_bound]        || QC::TOP_BOUND
    fork_worker      = opts[:fork_worker]      || QC::FORK_WORKER
    listening_worker = opts[:listening_worker] || QC::LISTENING_WORKER
    max_attempts     = opts[:max_attempts]     || QC::MAX_LOCK_ATTEMPTS
  else
    raise ArgumentError, 'wrong number of arguments (expected no args, an options hash, or 5 separate args)'
  end

  @running = true
  @queue = Queue.new(q_name, listening_worker)
  @top_bound = top_bound
  @fork_worker = fork_worker
  @listening_worker = listening_worker
  @max_attempts = max_attempts
  handle_signals

  log(
    :level => :debug,
    :action => "worker_initialized",
    :queue => q_name,
    :top_bound => top_bound,
    :fork_worker => fork_worker,
    :listening_worker => listening_worker,
    :max_attempts => max_attempts
  )
end

Instance Attribute Details

#queueObject (readonly)

Returns the value of attribute queue.



4
5
6
# File 'lib/queue_classic/worker.rb', line 4

def queue
  @queue
end

Instance Method Details

#call(job) ⇒ Object



125
126
127
128
129
130
# File 'lib/queue_classic/worker.rb', line 125

def call(job)
  args = job[:args]
  klass = eval(job[:method].split(".").first)
  message = job[:method].split(".").last
  klass.send(message, *args)
end

#can_listen?Boolean

Returns:

  • (Boolean)


47
48
49
# File 'lib/queue_classic/worker.rb', line 47

def can_listen?
  @listening_worker
end

#fork_and_workObject



80
81
82
83
84
# File 'lib/queue_classic/worker.rb', line 80

def fork_and_work
  @cpid = fork { setup_child; work }
  log(:level => :debug, :action => :fork, :pid => @cpid)
  Process.wait(@cpid)
end

#fork_worker?Boolean

Returns:

  • (Boolean)


43
44
45
# File 'lib/queue_classic/worker.rb', line 43

def fork_worker?
  @fork_worker
end

#handle_failure(job, e) ⇒ Object

override this method to do whatever you want



147
148
149
150
151
152
153
# File 'lib/queue_classic/worker.rb', line 147

def handle_failure(job,e)
  puts "!"
  puts "! \t FAIL"
  puts "! \t \t #{job.inspect}"
  puts "! \t \t #{e.inspect}"
  puts "!"
end

#handle_signalsObject



51
52
53
54
55
56
57
58
59
60
61
62
# File 'lib/queue_classic/worker.rb', line 51

def handle_signals
  %W(INT TERM).each do |sig|
    trap(sig) do
      if running?
        @running = false
        log(:level => :debug, :action => "handle_signal", :running => @running)
      else
        raise Interrupt
      end
    end
  end
end

#lock_jobObject



102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
# File 'lib/queue_classic/worker.rb', line 102

def lock_job
  log(:level => :debug, :action => "lock_job")
  attempts = 0
  job = nil
  until job
    job = @queue.lock(@top_bound)
    if job.nil?
      log(:level => :debug, :action => "failed_lock", :attempts => attempts)
      if attempts < @max_attempts
        seconds = 2**attempts
        wait(seconds)
        attempts += 1
        next
      else
        break
      end
    else
      log(:level => :debug, :action => "finished_lock", :job => job[:id])
    end
  end
  job
end

#log(data) ⇒ Object



155
156
157
# File 'lib/queue_classic/worker.rb', line 155

def log(data)
  QC.log(data)
end

#running?Boolean

Returns:

  • (Boolean)


39
40
41
# File 'lib/queue_classic/worker.rb', line 39

def running?
  @running
end

#setup_childObject

This method should be overriden if your worker is forking and you need to re-establish database connectoins



67
68
# File 'lib/queue_classic/worker.rb', line 67

def setup_child
end

#startObject



70
71
72
73
74
75
76
77
78
# File 'lib/queue_classic/worker.rb', line 70

def start
  while running?
    if fork_worker?
      fork_and_work
    else
      work
    end
  end
end

#wait(t) ⇒ Object



132
133
134
135
136
137
138
139
140
141
142
143
144
# File 'lib/queue_classic/worker.rb', line 132

def wait(t)
  if can_listen?
    log(:level => :debug, :action => "listen_wait", :wait => t)
    Conn.listen(@queue.chan)
    Conn.wait_for_notify(t)
    Conn.unlisten(@queue.chan)
    Conn.drain_notify
    log(:level => :debug, :action => "finished_listening")
  else
    log(:level => :debug, :action => "sleep_wait", :wait => t)
    Kernel.sleep(t)
  end
end

#workObject



86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
# File 'lib/queue_classic/worker.rb', line 86

def work
  if job = lock_job
    QC.log_yield(:level => :info, :action => "work_job", :job => job[:id]) do
      begin
        call(job)
      rescue Object => e
        log(:level => :debug, :action => "failed_work", :job => job[:id], :error => e.inspect)
        handle_failure(job, e)
      ensure
        @queue.delete(job[:id])
        log(:level => :debug, :action => "delete_job", :job => job[:id])
      end
    end
  end
end