Class: QC::Worker
- Inherits:
-
Object
- Object
- QC::Worker
- Defined in:
- lib/queue_classic/worker.rb
Instance Attribute Summary collapse
-
#queue ⇒ Object
readonly
Returns the value of attribute queue.
Instance Method Summary collapse
- #call(job) ⇒ Object
- #can_listen? ⇒ Boolean
- #fork_and_work ⇒ Object
- #fork_worker? ⇒ Boolean
-
#handle_failure(job, e) ⇒ Object
override this method to do whatever you want.
- #handle_signals ⇒ Object
-
#initialize(*args) ⇒ Worker
constructor
A new instance of Worker.
- #lock_job ⇒ Object
- #log(data) ⇒ Object
- #running? ⇒ Boolean
-
#setup_child ⇒ Object
This method should be overriden if your worker is forking and you need to re-establish database connectoins.
- #start ⇒ Object
- #wait(t) ⇒ Object
- #work ⇒ Object
Constructor Details
#initialize(*args) ⇒ Worker
Returns a new instance of Worker.
6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 |
# File 'lib/queue_classic/worker.rb', line 6 def initialize(*args) if args.length == 5 q_name, top_bound, fork_worker, listening_worker, max_attempts = *args elsif args.length <= 1 opts = args.first || {} q_name = opts[:q_name] || QC::QUEUE top_bound = opts[:top_bound] || QC::TOP_BOUND fork_worker = opts[:fork_worker] || QC::FORK_WORKER listening_worker = opts[:listening_worker] || QC::LISTENING_WORKER max_attempts = opts[:max_attempts] || QC::MAX_LOCK_ATTEMPTS else raise ArgumentError, 'wrong number of arguments (expected no args, an options hash, or 5 separate args)' end @running = true @queue = Queue.new(q_name, listening_worker) @top_bound = top_bound @fork_worker = fork_worker @listening_worker = listening_worker @max_attempts = max_attempts handle_signals log( :level => :debug, :action => "worker_initialized", :queue => q_name, :top_bound => top_bound, :fork_worker => fork_worker, :listening_worker => listening_worker, :max_attempts => max_attempts ) end |
Instance Attribute Details
#queue ⇒ Object (readonly)
Returns the value of attribute queue.
4 5 6 |
# File 'lib/queue_classic/worker.rb', line 4 def queue @queue end |
Instance Method Details
#call(job) ⇒ Object
125 126 127 128 129 130 |
# File 'lib/queue_classic/worker.rb', line 125 def call(job) args = job[:args] klass = eval(job[:method].split(".").first) = job[:method].split(".").last klass.send(, *args) end |
#can_listen? ⇒ Boolean
47 48 49 |
# File 'lib/queue_classic/worker.rb', line 47 def can_listen? @listening_worker end |
#fork_and_work ⇒ Object
80 81 82 83 84 |
# File 'lib/queue_classic/worker.rb', line 80 def fork_and_work @cpid = fork { setup_child; work } log(:level => :debug, :action => :fork, :pid => @cpid) Process.wait(@cpid) end |
#fork_worker? ⇒ Boolean
43 44 45 |
# File 'lib/queue_classic/worker.rb', line 43 def fork_worker? @fork_worker end |
#handle_failure(job, e) ⇒ Object
override this method to do whatever you want
147 148 149 150 151 152 153 |
# File 'lib/queue_classic/worker.rb', line 147 def handle_failure(job,e) puts "!" puts "! \t FAIL" puts "! \t \t #{job.inspect}" puts "! \t \t #{e.inspect}" puts "!" end |
#handle_signals ⇒ Object
51 52 53 54 55 56 57 58 59 60 61 62 |
# File 'lib/queue_classic/worker.rb', line 51 def handle_signals %W(INT TERM).each do |sig| trap(sig) do if running? @running = false log(:level => :debug, :action => "handle_signal", :running => @running) else raise Interrupt end end end end |
#lock_job ⇒ Object
102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 |
# File 'lib/queue_classic/worker.rb', line 102 def lock_job log(:level => :debug, :action => "lock_job") attempts = 0 job = nil until job job = @queue.lock(@top_bound) if job.nil? log(:level => :debug, :action => "failed_lock", :attempts => attempts) if attempts < @max_attempts seconds = 2**attempts wait(seconds) attempts += 1 next else break end else log(:level => :debug, :action => "finished_lock", :job => job[:id]) end end job end |
#log(data) ⇒ Object
155 156 157 |
# File 'lib/queue_classic/worker.rb', line 155 def log(data) QC.log(data) end |
#running? ⇒ Boolean
39 40 41 |
# File 'lib/queue_classic/worker.rb', line 39 def running? @running end |
#setup_child ⇒ Object
This method should be overriden if your worker is forking and you need to re-establish database connectoins
67 68 |
# File 'lib/queue_classic/worker.rb', line 67 def setup_child end |
#start ⇒ Object
70 71 72 73 74 75 76 77 78 |
# File 'lib/queue_classic/worker.rb', line 70 def start while running? if fork_worker? fork_and_work else work end end end |
#wait(t) ⇒ Object
132 133 134 135 136 137 138 139 140 141 142 143 144 |
# File 'lib/queue_classic/worker.rb', line 132 def wait(t) if can_listen? log(:level => :debug, :action => "listen_wait", :wait => t) Conn.listen(@queue.chan) Conn.wait_for_notify(t) Conn.unlisten(@queue.chan) Conn.drain_notify log(:level => :debug, :action => "finished_listening") else log(:level => :debug, :action => "sleep_wait", :wait => t) Kernel.sleep(t) end end |
#work ⇒ Object
86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 |
# File 'lib/queue_classic/worker.rb', line 86 def work if job = lock_job QC.log_yield(:level => :info, :action => "work_job", :job => job[:id]) do begin call(job) rescue Object => e log(:level => :debug, :action => "failed_work", :job => job[:id], :error => e.inspect) handle_failure(job, e) ensure @queue.delete(job[:id]) log(:level => :debug, :action => "delete_job", :job => job[:id]) end end end end |