Class: Backburner::Workers::Threading

Inherits:
Backburner::Worker show all
Defined in:
lib/backburner/workers/threading.rb

Class Attribute Summary collapse

Instance Attribute Summary collapse

Attributes inherited from Backburner::Worker

#connection, #tube_names

Instance Method Summary collapse

Methods inherited from Backburner::Worker

enqueue, #handle_failure_for_job, start, #work_one_job

Methods included from Logger

included, #job_started_at, #log_error, #log_info, #log_job_begin, #log_job_end, #logger

Methods included from Helpers

#classify, #constantize, #dasherize, #exception_message, #expand_tube_name, included, #queue_config, #resolve_max_job_retries, #resolve_priority, #resolve_respond_timeout, #resolve_retry_delay, #resolve_retry_delay_proc

Constructor Details

#initialize(*args) ⇒ Threading

Custom initializer just to set @tubes_data



16
17
18
19
20
21
# File 'lib/backburner/workers/threading.rb', line 16

def initialize(*args)
  @tubes_data = {}
  super
  self.process_tube_options
  @exit_on_shutdown = true
end

Class Attribute Details

.shutdown_timeoutObject

Returns the value of attribute shutdown_timeout.



12
13
14
# File 'lib/backburner/workers/threading.rb', line 12

def shutdown_timeout
  @shutdown_timeout
end

.threads_numberObject

Returns the value of attribute threads_number.



11
12
13
# File 'lib/backburner/workers/threading.rb', line 11

def threads_number
  @threads_number
end

Instance Attribute Details

#exit_on_shutdownObject

Returns the value of attribute exit_on_shutdown.



6
7
8
# File 'lib/backburner/workers/threading.rb', line 6

def exit_on_shutdown
  @exit_on_shutdown
end

#self_readObject

Returns the value of attribute self_read.



6
7
8
# File 'lib/backburner/workers/threading.rb', line 6

def self_read
  @self_read
end

#self_writeObject

Returns the value of attribute self_write.



6
7
8
# File 'lib/backburner/workers/threading.rb', line 6

def self_write
  @self_write
end

Instance Method Details

#killObject



141
142
143
# File 'lib/backburner/workers/threading.rb', line 141

def kill
  @thread_pools.each { |_name, pool| pool.kill unless pool.shutdown? }
end

#prepareObject

Used to prepare job queues before processing jobs. Setup beanstalk tube_names and watch all specified tubes for jobs.

Examples:

@worker.prepare

Raises:

  • (Beaneater::NotConnected)

    If beanstalk fails to connect.



30
31
32
33
34
35
36
37
38
# File 'lib/backburner/workers/threading.rb', line 30

def prepare
  self.tube_names.map! { |name| expand_tube_name(name)  }.uniq!
  log_info "Working #{tube_names.size} queues: [ #{tube_names.join(', ')} ]"
  @thread_pools = {}
  @tubes_data.each do |name, config|
    max_threads = (config[:threads] || self.class.threads_number || ::Concurrent.processor_count).to_i
    @thread_pools[name] = (::Concurrent::ThreadPoolExecutor.new(min_threads: 1, max_threads: max_threads))
  end
end

#process_tube_names(tube_names) ⇒ Object

Process the special tube_names of Threading worker:

The format is tube_name:custom_threads_limit

Examples:

process_tube_names(['foo:10', 'lol'])
=> ['foo', lol']


88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
# File 'lib/backburner/workers/threading.rb', line 88

def process_tube_names(tube_names)
  names = compact_tube_names(tube_names)
  if names.nil?
    nil
  else
    names.map do |name|
      data = name.split(":")
      tube_name = data.first
      threads_number = data[1].empty? ? nil : data[1].to_i rescue nil
      @tubes_data[expand_tube_name(tube_name)] = {
        :threads => threads_number
      }
      tube_name
    end
  end
end

#process_tube_optionsObject

Process the tube settings This overrides @tubes_data set by process_tube_names method. So a tube has name ‘super_job:5’ and the tube class has setting queue_jobs_limit 10, the result limit will be 10 If the tube is known by existing allq queue, but not by class - skip it



110
111
112
113
114
115
116
117
118
# File 'lib/backburner/workers/threading.rb', line 110

def process_tube_options
  Backburner::Worker.known_queue_classes.each do |queue|
    next if @tubes_data[expand_tube_name(queue)].nil?
    queue_settings = {
      :threads => queue.queue_jobs_limit
    }
    @tubes_data[expand_tube_name(queue)].merge!(queue_settings){|k, v1, v2| v2.nil? ? v1 : v2 }
  end
end

#register_signal_handlers!Object

Registers signal handlers TERM and INT to trigger



152
153
154
155
156
157
158
159
160
# File 'lib/backburner/workers/threading.rb', line 152

def register_signal_handlers!
  @self_read, @self_write = IO.pipe
  %w[TERM INT].each do |sig|
    trap(sig) do
      raise Interrupt if @in_shutdown
      self_write.puts(sig)
    end
  end
end

#shutdownObject



145
146
147
148
149
# File 'lib/backburner/workers/threading.rb', line 145

def shutdown
  log_info "beginning graceful worker shutdown"
  shutdown_threadpools
  super if @exit_on_shutdown
end

#shutdown_threadpoolsObject



127
128
129
130
131
132
133
134
135
136
137
138
139
# File 'lib/backburner/workers/threading.rb', line 127

def shutdown_threadpools
  @thread_pools.each { |_name, pool| pool.shutdown }
  shutdown_time = Time.now
  @in_shutdown = true
  all_shutdown = @thread_pools.all? do |_name, pool|
    time_to_wait = self.class.shutdown_timeout - (Time.now - shutdown_time).to_i
    pool.wait_for_termination(time_to_wait) if time_to_wait > 0
  end
rescue Interrupt
  log_info "graceful shutdown aborted, shutting down immediately"
ensure
  kill unless all_shutdown
end

#start(wait = true) ⇒ Object

Starts processing new jobs indefinitely. Primary way to consume and process jobs in specified tubes.

Examples:

@worker.start


46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
# File 'lib/backburner/workers/threading.rb', line 46

def start(wait=true)
  prepare

  @thread_pools.each do |tube_name, pool|
    pool.max_length.times do
      # Create a new connection and set it up to listen on this tube name
      # connection = new_connection.tap{ |conn| conn.tubes.watch!(tube_name) }
      # connection.on_reconnect = lambda { |conn| conn.tubes.watch!(tube_name) }

      # Make it work jobs using its own connection per thread
      pool.post(connection) do |memo_connection|
        # TODO: use read-write lock?
        loop do
          begin
            break if @in_shutdown
            work_one_job(memo_connection, tube_name)
          rescue => e
            log_error("Exception caught in thread pool loop. Continuing. -> #{e.message}\nBacktrace: #{e.backtrace}")
          end
        end

        connection.close
      end
    end
  end

  wait_for_shutdown! if wait
end

#wait_for_shutdown!Object

Wait for the shutdown signel



121
122
123
124
125
# File 'lib/backburner/workers/threading.rb', line 121

def wait_for_shutdown!
  raise Interrupt while IO.select([self_read])
rescue Interrupt
  shutdown
end