Class: Backburner::Worker Abstract

Inherits:
Object
  • Object
show all
Includes:
Helpers, Logger
Defined in:
lib/backburner/worker.rb

Overview

This class is abstract.

Subclass and override #process_tube_names, #prepare and #start to implement a custom Worker class.

Class Attribute Summary collapse

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Methods included from Logger

included, #job_started_at, #log_error, #log_info, #log_job_begin, #log_job_end, #logger

Methods included from Helpers

#classify, #constantize, #dasherize, #exception_message, #expand_tube_name, included, #queue_config, #resolve_max_job_retries, #resolve_priority, #resolve_respond_timeout, #resolve_retry_delay, #resolve_retry_delay_proc

Constructor Details

#initialize(tube_names = nil) ⇒ Worker

Constructs a new worker for processing jobs within specified tubes.

Examples:

Worker.new(['test.job'])


86
87
88
89
90
# File 'lib/backburner/worker.rb', line 86

def initialize(tube_names = nil)
  @connection = new_connection
  @tube_names = process_tube_names(tube_names)
  register_signal_handlers!
end

Class Attribute Details

.known_queue_classesObject



17
18
19
# File 'lib/backburner/worker.rb', line 17

def known_queue_classes
  @known_queue_classes ||= []
end

Instance Attribute Details

#connectionObject

List of tube names to be watched and processed



80
81
82
# File 'lib/backburner/worker.rb', line 80

def connection
  @connection
end

#tube_namesObject

List of tube names to be watched and processed



80
81
82
# File 'lib/backburner/worker.rb', line 80

def tube_names
  @tube_names
end

Class Method Details

.enqueue(job_class, args = [], opts = {}) ⇒ Object

Enqueues a job to be processed later by a worker. Options: ‘pri` (priority), `delay` (delay in secs), `ttr` (time to respond), `queue` (queue name)

Examples:

Backburner::Worker.enqueue NewsletterSender, [self.id, user.id], :ttr => 1000

Raises:

  • (Beaneater::NotConnected)

    If beanstalk fails to connect.



29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
# File 'lib/backburner/worker.rb', line 29

def self.enqueue(job_class, args = [], opts = {})
  # Invoke Procs if they are sent
  opts.each_key do |k|
    opts[k] = opts[k].call job_class, args if opts[k].instance_of?(Proc)
  end

  opts[:shard_key] = opts[:shard_key].nil? ? 'X' : opts[:shard_key].to_s
  pri   = resolve_priority(opts[:pri] || job_class)
  delay = [0, opts[:delay].to_i].max
  ttr   = resolve_respond_timeout(opts[:ttr] || job_class)
  res   = Backburner::Hooks.invoke_hook_events(job_class, :before_enqueue, *args)

  return nil unless res # stop if hook is false

  data = { class: job_class.name, args: args, ttr: ttr }
  queue = opts[:queue] && (opts[:queue].is_a?(Proc) ? opts[:queue].call(job_class) : opts[:queue])

  begin
    response = nil
    connection = Backburner::Connection.new(Backburner.configuration.allq_url)
    connection.retryable do
      tube_name = expand_tube_name(queue || job_class)
      serialized_data = Backburner.configuration.job_serializer_proc.call(data)
      send_data = {
        pri: pri,
        delay: delay,
        ttr: ttr
      }
      opts.merge!(send_data)
      response = connection.put(tube_name, serialized_data, opts)
    end
    return nil unless Backburner::Hooks.invoke_hook_events(job_class, :after_enqueue, *args)
  ensure
    connection.close if connection
  end

  response
end

.start(tube_names = nil) ⇒ Object

Starts processing jobs with the specified tube_names.

Examples:

Backburner::Worker.start(["foo.tube.name"])


73
74
75
76
77
# File 'lib/backburner/worker.rb', line 73

def self.start(tube_names = nil)
  new(tube_names).start
rescue SystemExit
  # do nothing
end

Instance Method Details

#prepareObject

This method is abstract.

Define this in your worker subclass

Used to prepare the job queues before job processing is initiated.

to be run once before processing. Recommended to watch tubes or print a message to the logs with ‘log_info’

Examples:

@worker.prepare

Raises:

  • (Beaneater::NotConnected)

    If beanstalk fails to connect.



112
113
114
# File 'lib/backburner/worker.rb', line 112

def prepare
  raise NotImplementedError
end

#process_tube_names(tube_names) ⇒ Object

Note:

This method can be overridden in inherited workers

Processes tube_names given tube_names array. Should return normalized tube_names as an array of strings.

to add more complex tube name processing.

Examples:

process_tube_names([['foo'], ['bar']])
=> ['foo', 'bar', 'baz']


133
134
135
# File 'lib/backburner/worker.rb', line 133

def process_tube_names(tube_names)
  compact_tube_names(tube_names)
end

#shutdownObject

Triggers this worker to shutdown



117
118
119
120
121
122
# File 'lib/backburner/worker.rb', line 117

def shutdown
  Thread.new do
    log_info 'Worker exiting...'
  end
  Kernel.exit
end

#startObject

Starts processing ready jobs indefinitely. Primary way to consume and process jobs in specified tubes.

Examples:

@worker.start

Raises:

  • (NotImplementedError)


98
99
100
# File 'lib/backburner/worker.rb', line 98

def start
  raise NotImplementedError
end

#work_one_job(conn = connection, tube_name = nil) ⇒ Object

Performs a job by reserving a job from beanstalk and processing it

Examples:

@worker.work_one_job

Raises:

  • (Beaneater::NotConnected)

    If beanstalk fails to connect multiple times.



142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
# File 'lib/backburner/worker.rb', line 142

def work_one_job(conn = connection, tube_name = nil)
  if tube_name.nil?
    log_error 'Sampling tube, this is bad practice for Allq'
    tube_name = @tube_names.sample
  end

  begin
    job = reserve_job(conn, tube_name)
  rescue Exception => e
    log_error "Exception: #{e.full_message}"
    sleep(rand * 3)
    return
  end

  if job && job.body
    begin
      log_job_begin(job.name, job.args)
      job.process
      log_job_end(job.name)
    rescue Backburner::Job::JobFormatInvalid => e
      log_error exception_message(e)
    rescue StandardError => e # Error occurred processing job
      log_error exception_message(e) unless e.is_a?(Backburner::Job::RetryJob)

      unless job
        log_error 'Error occurred before we were able to assign a job. Giving up without retrying!'
        return
      end

      handle_error(e, job.name, job.args, job)
    end
  else
    sleep(rand * 3)
  end
  job
end