Class: Backburner::Worker Abstract

Inherits:
Object
  • Object
show all
Includes:
Helpers, Logger
Defined in:
lib/backburner/worker.rb

Overview

This class is abstract.

Subclass and override #process_tube_names, #prepare and #start to implement a custom Worker class.

Class Attribute Summary collapse

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Methods included from Logger

included, #job_started_at, #log_error, #log_info, #log_job_begin, #log_job_end, #logger

Methods included from Helpers

#classify, #constantize, #dasherize, #exception_message, #expand_tube_name, included, #queue_config, #resolve_priority, #resolve_respond_timeout

Constructor Details

#initialize(tube_names = nil) ⇒ Worker

Constructs a new worker for processing jobs within specified tubes.

Examples:

Worker.new(['test.job'])


73
74
75
76
77
# File 'lib/backburner/worker.rb', line 73

def initialize(tube_names=nil)
  @connection = new_connection
  @tube_names = self.process_tube_names(tube_names)
  register_signal_handlers!
end

Class Attribute Details

.known_queue_classesObject



16
# File 'lib/backburner/worker.rb', line 16

def known_queue_classes; @known_queue_classes ||= []; end

Instance Attribute Details

#connectionObject

List of tube names to be watched and processed



67
68
69
# File 'lib/backburner/worker.rb', line 67

def connection
  @connection
end

#tube_namesObject

List of tube names to be watched and processed



67
68
69
# File 'lib/backburner/worker.rb', line 67

def tube_names
  @tube_names
end

Class Method Details

.enqueue(job_class, args = [], opts = {}) ⇒ Object

Enqueues a job to be processed later by a worker. Options: ‘pri` (priority), `delay` (delay in secs), `ttr` (time to respond), `queue` (queue name)

Examples:

Backburner::Worker.enqueue NewsletterSender, [self.id, user.id], :ttr => 1000

Raises:

  • (Beaneater::NotConnected)

    If beanstalk fails to connect.



26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
# File 'lib/backburner/worker.rb', line 26

def self.enqueue(job_class, args=[], opts={})
  pri   = resolve_priority(opts[:pri] || job_class)
  delay = [0, opts[:delay].to_i].max
  ttr   = resolve_respond_timeout(opts[:ttr] || job_class)
  res   = Backburner::Hooks.invoke_hook_events(job_class, :before_enqueue, *args)

  return nil unless res # stop if hook is false

  data = { :class => job_class.name, :args => args }
  queue = opts[:queue] && (Proc === opts[:queue] ? opts[:queue].call(job_class) : opts[:queue])

  begin
    response = nil
    connection = Backburner::Connection.new(Backburner.configuration.beanstalk_url)
    connection.retryable do
      tube = connection.tubes[expand_tube_name(queue || job_class)]
      serialized_data = Backburner.configuration.job_serializer_proc.call(data)
      response = tube.put(serialized_data, :pri => pri, :delay => delay, :ttr => ttr)
    end
    return nil unless Backburner::Hooks.invoke_hook_events(job_class, :after_enqueue, *args)
  ensure
    connection.close if connection
  end

  response
end

.start(tube_names = nil) ⇒ Object

Starts processing jobs with the specified tube_names.

Examples:

Backburner::Worker.start(["foo.tube.name"])


58
59
60
61
62
63
64
# File 'lib/backburner/worker.rb', line 58

def self.start(tube_names=nil)
  begin
    self.new(tube_names).start
  rescue SystemExit
    # do nothing
  end
end

Instance Method Details

#prepareObject

This method is abstract.

Define this in your worker subclass

Used to prepare the job queues before job processing is initiated.

to be run once before processing. Recommended to watch tubes or print a message to the logs with ‘log_info’

Examples:

@worker.prepare

Raises:

  • (Beaneater::NotConnected)

    If beanstalk fails to connect.



99
100
101
# File 'lib/backburner/worker.rb', line 99

def prepare
  raise NotImplementedError
end

#process_tube_names(tube_names) ⇒ Object

Note:

This method can be overridden in inherited workers

Processes tube_names given tube_names array. Should return normalized tube_names as an array of strings.

to add more complex tube name processing.

Examples:

process_tube_names([['foo'], ['bar']])
=> ['foo', 'bar', 'baz']


120
121
122
# File 'lib/backburner/worker.rb', line 120

def process_tube_names(tube_names)
  compact_tube_names(tube_names)
end

#shutdownObject

Triggers this worker to shutdown



104
105
106
107
108
109
# File 'lib/backburner/worker.rb', line 104

def shutdown
  Thread.new do
    log_info 'Worker exiting...'
  end
  Kernel.exit
end

#startObject

Starts processing ready jobs indefinitely. Primary way to consume and process jobs in specified tubes.

Examples:

@worker.start

Raises:

  • (NotImplementedError)


85
86
87
# File 'lib/backburner/worker.rb', line 85

def start
  raise NotImplementedError
end

#work_one_job(conn = connection) ⇒ Object

Performs a job by reserving a job from beanstalk and processing it

Examples:

@worker.work_one_job

Raises:

  • (Beaneater::NotConnected)

    If beanstalk fails to connect multiple times.



129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
# File 'lib/backburner/worker.rb', line 129

def work_one_job(conn = connection)
  begin
    job = reserve_job(conn)
  rescue Beaneater::TimedOutError => e
    return
  end

  self.log_job_begin(job.name, job.args)
  job.process
  self.log_job_end(job.name)

rescue Backburner::Job::JobFormatInvalid => e
  self.log_error self.exception_message(e)
rescue => e # Error occurred processing job
  self.log_error self.exception_message(e) unless e.is_a?(Backburner::Job::RetryJob)

  unless job
    self.log_error "Error occurred before we were able to assign a job. Giving up without retrying!"
    return
  end

  # NB: There's a slight chance here that the connection to beanstalkd has
  # gone down between the time we reserved / processed the job and here.
  num_retries = job.stats.releases
  retry_status = "failed: attempt #{num_retries+1} of #{queue_config.max_job_retries+1}"
  if num_retries < queue_config.max_job_retries # retry again
    delay = queue_config.retry_delay_proc.call(queue_config.retry_delay, num_retries) rescue queue_config.retry_delay
    job.retry(num_retries + 1, delay)
    self.log_job_end(job.name, "#{retry_status}, retrying in #{delay}s") if job_started_at
  else # retries failed, bury
    job.bury
    self.log_job_end(job.name, "#{retry_status}, burying") if job_started_at
  end

  handle_error(e, job.name, job.args, job)
end