Class: QC::Worker

Inherits:
Object
  • Object
show all
Defined in:
lib/queue_classic/worker.rb

Overview

A Worker object can process jobs from one or many queues.

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(args = {}) ⇒ Worker

Creates a new worker but does not start the worker. See Worker#start. This method takes a single hash argument. The following keys are read:

fork_worker

Worker forks each job execution.

wait_interval

Time to wait between failed lock attempts

connection

PG::Connection object.

q_name

Name of a single queue to process.

q_names

Names of queues to process. Will process left to right.

top_bound

Offset to the head of the queue. 1 == strict FIFO.



21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
# File 'lib/queue_classic/worker.rb', line 21

def initialize(args={})
  @fork_worker = args[:fork_worker] || QC.fork_worker?
  @wait_interval = args[:wait_interval] || QC.wait_time

  if args[:connection]
    @conn_adapter = ConnAdapter.new(connection: args[:connection])
  else
    @conn_adapter = QC.default_conn_adapter
  end

  @queues = setup_queues(@conn_adapter,
    (args[:q_name] || QC.queue),
    (args[:q_names] || QC.queues),
    (args[:top_bound] || QC.top_bound))
  log(args.merge(:at => "worker_initialized"))
  @running = true
end

Instance Attribute Details

#queuesObject

Returns the value of attribute queues.



11
12
13
# File 'lib/queue_classic/worker.rb', line 11

def queues
  @queues
end

#runningObject

Returns the value of attribute running.



11
12
13
# File 'lib/queue_classic/worker.rb', line 11

def running
  @running
end

Instance Method Details

#call(job) ⇒ Object

Each job includes a method column. We will use ruby’s eval to grab the ruby object from memory. We send the method to the object and pass the args.



135
136
137
138
139
140
# File 'lib/queue_classic/worker.rb', line 135

def call(job)
  args = job[:args]
  receiver_str, _, message = job[:method].rpartition('.')
  receiver = eval(receiver_str)
  receiver.send(message, *args)
end

#fork_and_workObject

Calls Worker#work but after the current process is forked. The parent process will wait on the child process to exit.



65
66
67
68
69
# File 'lib/queue_classic/worker.rb', line 65

def fork_and_work
  cpid = fork {setup_child; work}
  log(:at => :fork, :pid => cpid)
  Process.wait(cpid)
end

#handle_failure(job, e) ⇒ Object

This method will be called when a StandardError, ScriptError or NoMemoryError is raised during the execution of the job.



148
149
150
# File 'lib/queue_classic/worker.rb', line 148

def handle_failure(job,e)
  $stderr.puts("count#qc.job-error=1 job=#{job} error=#{e.inspect} at=#{e.backtrace.first}")
end

#handle_success(queue, job) ⇒ Object



142
143
144
# File 'lib/queue_classic/worker.rb', line 142

def handle_success(queue, job)
  queue.delete(job[:id])
end

#lock_jobObject

Attempt to lock a job in the queue’s table. If a job can be locked, this method returns an array with 2 elements. The first element is the queue from which the job was locked and the second is a hash representation of the job. If a job is returned, its locked_at column has been set in the job’s row. It is the caller’s responsibility to delete the job row from the table when the job is complete.



89
90
91
92
93
94
95
96
97
98
99
100
# File 'lib/queue_classic/worker.rb', line 89

def lock_job
  log(:at => "lock_job")
  job = nil
  while @running
    @queues.each do |queue|
      if job = queue.lock
        return [queue, job]
      end
    end
    @conn_adapter.wait(@wait_interval, *@queues.map {|q| q.name})
  end
end

#log(data) ⇒ Object



159
160
161
# File 'lib/queue_classic/worker.rb', line 159

def log(data)
  QC.log(data)
end

#process(queue, job) ⇒ Object

A job is processed by evaluating the target code. if the job is evaluated with no exceptions then it is deleted from the queue. If the job has raised an exception the responsibility of what to do with the job is delegated to Worker#handle_failure. If the job is not finished and an INT signal is trapped, this method will unlock the job in the queue.



109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
# File 'lib/queue_classic/worker.rb', line 109

def process(queue, job)
  start = Time.now
  finished = false
  begin
    call(job).tap do
      handle_success(queue, job)
      finished = true
    end
  rescue StandardError, ScriptError, NoMemoryError => e
    # We really only want to unlock the job for signal and system exit
    # exceptions. If we encounter a ScriptError or a NoMemoryError any
    # future run will likely encounter the same error.
    handle_failure(job, e)
    finished = true
  ensure
    if !finished
      queue.unlock(job[:id])
    end
    ttp = Integer((Time.now - start) * 1000)
    QC.measure("time-to-process=#{ttp} source=#{queue.name}")
  end
end

#setup_childObject

This method should be overriden if your worker is forking and you need to re-establish database connections



155
156
157
# File 'lib/queue_classic/worker.rb', line 155

def setup_child
  log(:at => "setup_child")
end

#startObject

Commences the working of jobs. start() spins on @running –which is initialized as true. This method is the primary entry point to starting the worker. The canonical example of starting a worker is as follows: QC::Worker.new.start



44
45
46
47
48
49
50
# File 'lib/queue_classic/worker.rb', line 44

def start
  QC.unlock_jobs_of_dead_workers

  while @running
    @fork_worker ? fork_and_work : work
  end
end

#stopObject

Signals the worker to stop taking new work. This method has no immediate effect. However, there are two loops in the worker (one in #start and another in #lock_job) which check the @running variable to determine if further progress is desirable. In the case that @running is false, the aforementioned methods will short circuit and cause the blocking call to #start to unblock.



59
60
61
# File 'lib/queue_classic/worker.rb', line 59

def stop
  @running = false
end

#workObject

Blocks on locking a job, and once a job is locked, it will process the job.



73
74
75
76
77
78
79
80
# File 'lib/queue_classic/worker.rb', line 73

def work
  queue, job = lock_job
  if queue && job
    QC.log_yield(:at => "work", :job => job[:id]) do
      process(queue, job)
    end
  end
end