Class: RocketJob::Worker

Inherits:
Object
  • Object
show all
Includes:
ActiveSupport::Callbacks, SemanticLogger::Loggable
Defined in:
lib/rocket_job/worker.rb

Overview

Worker

A worker runs on a single operating system thread Is usually started under a Rocket Job server process.

Defined Under Namespace

Classes: Shutdown

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(id: 0, server_name: "inline:0", inline: false) ⇒ Worker

Returns a new instance of Worker.



36
37
38
39
40
41
42
43
44
45
# File 'lib/rocket_job/worker.rb', line 36

def initialize(id: 0, server_name: "inline:0", inline: false)
  @id             = id
  @server_name    = server_name
  @shutdown       = Concurrent::Event.new
  @name           = "#{server_name}:#{id}"
  @re_check_start = Time.now
  @current_filter = Config.filter || {}
  @thread         = Thread.new { run } unless inline
  @inline         = inline
end

Instance Attribute Details

#current_filterObject

Returns the value of attribute current_filter.



14
15
16
# File 'lib/rocket_job/worker.rb', line 14

def current_filter
  @current_filter
end

#idObject

Returns the value of attribute id.



14
15
16
# File 'lib/rocket_job/worker.rb', line 14

def id
  @id
end

#inlineObject (readonly)

Returns the value of attribute inline.



15
16
17
# File 'lib/rocket_job/worker.rb', line 15

def inline
  @inline
end

#nameObject (readonly)

Returns the value of attribute name.



15
16
17
# File 'lib/rocket_job/worker.rb', line 15

def name
  @name
end

#server_nameObject (readonly)

Returns the value of attribute server_name.



15
16
17
# File 'lib/rocket_job/worker.rb', line 15

def server_name
  @server_name
end

#threadObject (readonly)

Returns the value of attribute thread.



15
16
17
# File 'lib/rocket_job/worker.rb', line 15

def thread
  @thread
end

Class Method Details

.after_running(*filters, &blk) ⇒ Object



28
29
30
# File 'lib/rocket_job/worker.rb', line 28

def self.after_running(*filters, &blk)
  set_callback(:running, :after, *filters, &blk)
end

.around_running(*filters, &blk) ⇒ Object



32
33
34
# File 'lib/rocket_job/worker.rb', line 32

def self.around_running(*filters, &blk)
  set_callback(:running, :around, *filters, &blk)
end

.before_running(*filters, &blk) ⇒ Object



24
25
26
# File 'lib/rocket_job/worker.rb', line 24

def self.before_running(*filters, &blk)
  set_callback(:running, :before, *filters, &blk)
end

Instance Method Details

#add_to_current_filter(filter) ⇒ Object

Add the supplied filter to the current filter.



199
200
201
202
203
204
205
206
207
208
209
# File 'lib/rocket_job/worker.rb', line 199

def add_to_current_filter(filter)
  filter.each_pair do |k, v|
    current_filter[k] =
      if (previous = current_filter[k])
        v.is_a?(Array) ? previous + v : v
      else
        v
      end
  end
  current_filter
end

#alive?Boolean

Returns:

  • (Boolean)


47
48
49
# File 'lib/rocket_job/worker.rb', line 47

def alive?
  inline ? true : @thread.alive?
end

#backtraceObject



51
52
53
# File 'lib/rocket_job/worker.rb', line 51

def backtrace
  inline ? Thread.current.backtrace : @thread.backtrace
end

#find_and_assign_jobObject



175
176
177
178
179
180
181
182
183
184
# File 'lib/rocket_job/worker.rb', line 175

def find_and_assign_job
  SemanticLogger.silence(:info) do
    scheduled = RocketJob::Job.where(run_at: nil).or(:run_at.lte => Time.now)
    working   = RocketJob::Job.queued.or(state: :running, sub_state: :processing)
    query     = RocketJob::Job.and(working, scheduled)
    query     = query.and(current_filter) unless current_filter.blank?
    update    = {"$set" => {"worker_name" => name, "state" => "running"}}
    query.sort(priority: 1, _id: 1).find_one_and_update(update, bypass_document_validation: true)
  end
end

#join(*args) ⇒ Object



55
56
57
# File 'lib/rocket_job/worker.rb', line 55

def join(*args)
  @thread.join(*args) unless inline
end

#killObject

Send each active worker the RocketJob::ShutdownException so that stops processing immediately.



60
61
62
63
64
# File 'lib/rocket_job/worker.rb', line 60

def kill
  return true if inline

  @thread.raise(Shutdown, "Shutdown due to kill request for worker: #{name}") if @thread.alive?
end

#next_available_jobObject

Returns [RocketJob::Job] the next job available for processing. Returns [nil] if no job is available for processing.

Notes:

  • Destroys expired jobs

  • Runs job throttles and skips the job if it is throttled.

    • Adding that filter to the current filter to exclude from subsequent polling.



129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
# File 'lib/rocket_job/worker.rb', line 129

def next_available_job
  until shutdown?
    job = find_and_assign_job
    return unless job

    if job.expired?
      job.fail_on_exception! do
        job.worker_name = name
        job.destroy
        logger.info("Destroyed expired job.")
      end
      next
    end

    # Batch Job that is already started?
    # Batch has its own throttles for slices.
    return job if job.running?

    # Should this job be throttled?
    next if job.fail_on_exception! { throttled_job?(job) }

    # Start this job!
    job.fail_on_exception! { job.start!(name) }
    return job if job.running?
  end
end

#random_wait_intervalObject

Returns [Float] a randomized poll interval in seconds up to the maximum configured poll interval.



212
213
214
# File 'lib/rocket_job/worker.rb', line 212

def random_wait_interval
  rand(Config.max_poll_seconds * 1000) / 1000
end

#reset_filter_if_expiredObject

Resets the current job filter if the relevant time interval has passed



113
114
115
116
117
118
119
120
# File 'lib/rocket_job/worker.rb', line 113

def reset_filter_if_expired
  # Only clear out the current_filter after every `re_check_seconds`
  time = Time.now
  return unless (time - @re_check_start) > Config.re_check_seconds

  @re_check_start     = time
  self.current_filter = Config.filter || {}
end

#runObject

Process jobs until it shuts down

Params

worker_id [Integer]
  The number of this worker for logging purposes


84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
# File 'lib/rocket_job/worker.rb', line 84

def run
  Thread.current.name = format("rocketjob %03i", id)
  logger.info "Started"

  until shutdown?
    sleep_seconds = Config.max_poll_seconds
    reset_filter_if_expired
    job = next_available_job

    # Returns true when work was completed, but no other work is available
    if job&.rocket_job_work(self, false)
      # Return the database connections for this thread back to the connection pool
      ActiveRecord::Base.clear_active_connections! if defined?(ActiveRecord::Base)

      # Stagger workers so that they don't all poll at the same time.
      sleep_seconds = random_wait_interval
    end

    wait_for_shutdown?(sleep_seconds)
  end

  logger.info "Stopping"
rescue Exception => e
  logger.fatal("Unhandled exception in job processing thread", e)
ensure
  ActiveRecord::Base.clear_active_connections! if defined?(ActiveRecord::Base)
end

#shutdown!Object



70
71
72
# File 'lib/rocket_job/worker.rb', line 70

def shutdown!
  @shutdown.set
end

#shutdown?Boolean

Returns:

  • (Boolean)


66
67
68
# File 'lib/rocket_job/worker.rb', line 66

def shutdown?
  @shutdown.set?
end

#throttled_job?(job) ⇒ Boolean

Whether the supplied job has been throttled and should be ignored.

Returns:

  • (Boolean)


157
158
159
160
161
162
163
164
165
166
# File 'lib/rocket_job/worker.rb', line 157

def throttled_job?(job)
  # Evaluate job throttles, if any.
  filter = job.rocket_job_throttles.matching_filter(job)
  return false unless filter

  add_to_current_filter(filter)
  # Restore retrieved job so that other workers can process it later
  job.set(worker_name: nil, state: :queued)
  true
end

#wait_for_shutdown?(timeout = nil) ⇒ Boolean

Returns [true|false] whether the shutdown indicator was set

Returns:

  • (Boolean)


75
76
77
# File 'lib/rocket_job/worker.rb', line 75

def wait_for_shutdown?(timeout = nil)
  @shutdown.wait(timeout)
end