Class: Delayed::Threaded::Worker

Inherits:
Worker
  • Object
show all
Includes:
SleepCalculator
Defined in:
lib/delayed/threaded/worker.rb

Overview

Threaded DJ worker implementation.

  • inspired by Delayed::Command

  • no daemons dependency + thread-safe

Defined Under Namespace

Classes: Config

Class Method Summary collapse

Instance Method Summary collapse

Methods included from SleepCalculator

#sleep

Class Method Details

.lifecycleObject



17
# File 'lib/delayed/threaded/worker.rb', line 17

def self.lifecycle; Delayed::Worker.lifecycle end

.setup_lifecycleObject



20
# File 'lib/delayed/threaded/worker.rb', line 20

def self.setup_lifecycle; Delayed::Worker.setup_lifecycle end

Instance Method Details

#exit!Object



131
132
133
134
135
136
137
138
# File 'lib/delayed/threaded/worker.rb', line 131

def exit!
  return if @exit # #stop?
  say "Stoping job worker"
  @exit = true # #stop
  if Delayed.const_defined?(:Job) && Delayed::Job.respond_to?(:clear_locks!)
    Delayed::Job.clear_locks!(name)
  end
end

#nameObject

e.g. :

def self.sleep_delay=(value)
  (Thread.current[:delayed_threaded_worker_config] ||= Config.new).sleep_delay = value
end

def self.min_priority
  if (config = Thread.current[:delayed_threaded_worker_config]) && config.key?(:sleep_delay)
    config.sleep_delay
  else
    superclass.sleep_delay # Delayed::Worker.sleep_delay
  end
end


93
94
95
96
97
98
99
100
101
102
103
# File 'lib/delayed/threaded/worker.rb', line 93

def name
  if (@name ||= nil).nil?
    # super - [prefix]host:hostname pid:process_pid
    begin
      @name = "#{super} thread:#{thread_id}".freeze
    rescue
      @name = "#{@name_prefix}thread:#{thread_id}".freeze
    end
  end
  @name
end

#startObject



145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
# File 'lib/delayed/threaded/worker.rb', line 145

def start
  say "Starting job worker"
  trap

  loop do
    result = nil

    realtime = Benchmark.realtime do
      result = work_off
    end

    count = result.sum

    break if @exit

    if count.zero?
      sleep(self.class.sleep_delay)
    else
      say "#{count} jobs processed at %.4f j/s, %d failed ..." % [count / realtime, result.last]
    end

    break if @exit
  end
end

#stopObject



171
# File 'lib/delayed/threaded/worker.rb', line 171

def stop; @exit = true; end

#stop?Boolean

Returns:

  • (Boolean)


170
# File 'lib/delayed/threaded/worker.rb', line 170

def stop?; !!@exit; end

#thread_idObject



108
109
110
111
112
113
114
115
116
# File 'lib/delayed/threaded/worker.rb', line 108

def thread_id
  if ( name = Thread.current.name || '' ).empty?
    name = java.lang.Thread.currentThread.getName
    if name.size > 100 && match = name.match(/(.*?)\:\s.*?[\/\\]+/)
      name = match[1]
    end
  end
  name
end

#to_sObject



105
# File 'lib/delayed/threaded/worker.rb', line 105

def to_s; name; end