Class: RocketJob::Worker

Inherits:
Object
  • Object
show all
Includes:
SemanticLogger::Loggable
Defined in:
lib/rocket_job/worker.rb

Overview

Worker

A worker runs on a single operating system thread Is usually started under a Rocket Job server process.

Direct Known Subclasses

RactorWorker, ThreadWorker

Defined Under Namespace

Classes: Shutdown

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(id: 0, server_name: "inline:0") ⇒ Worker

Returns a new instance of Worker.



19
20
21
22
23
24
25
# File 'lib/rocket_job/worker.rb', line 19

def initialize(id: 0, server_name: "inline:0")
  @id             = id
  @server_name    = server_name
  @name           = "#{server_name}:#{id}"
  @re_check_start = Time.now
  @current_filter = Config.filter || {}
end

Instance Attribute Details

#current_filterObject

Returns the value of attribute current_filter.



9
10
11
# File 'lib/rocket_job/worker.rb', line 9

def current_filter
  @current_filter
end

#idObject

Returns the value of attribute id.



9
10
11
# File 'lib/rocket_job/worker.rb', line 9

def id
  @id
end

#nameObject (readonly)

Returns the value of attribute name.



10
11
12
# File 'lib/rocket_job/worker.rb', line 10

def name
  @name
end

#server_nameObject (readonly)

Returns the value of attribute server_name.



10
11
12
# File 'lib/rocket_job/worker.rb', line 10

def server_name
  @server_name
end

Instance Method Details

#add_to_current_filter(filter) ⇒ Object

Add the supplied filter to the current filter.



165
166
167
168
169
170
171
172
173
174
175
# File 'lib/rocket_job/worker.rb', line 165

def add_to_current_filter(filter)
  filter.each_pair do |k, v|
    current_filter[k] =
      if (previous = current_filter[k])
        v.is_a?(Array) ? previous + v : v
      else
        v
      end
  end
  current_filter
end

#alive?Boolean

Returns:

  • (Boolean)


27
28
29
# File 'lib/rocket_job/worker.rb', line 27

def alive?
  true
end

#backtraceObject



31
32
33
# File 'lib/rocket_job/worker.rb', line 31

def backtrace
  Thread.current.backtrace
end

#find_and_assign_jobObject

Finds the next job to work on in priority based order and assigns it to this worker.

Applies the current filter to exclude filtered jobs.

Returns nil if no jobs are available for processing.



153
154
155
156
157
158
159
160
161
162
# File 'lib/rocket_job/worker.rb', line 153

def find_and_assign_job
  SemanticLogger.silence(:info) do
    scheduled = RocketJob::Job.where(run_at: nil).or(:run_at.lte => Time.now)
    working   = RocketJob::Job.queued.or(state: "running", sub_state: "processing")
    query     = RocketJob::Job.and(working, scheduled)
    query     = query.and(current_filter) unless current_filter.blank?
    update    = {"$set" => {"worker_name" => name, "state" => "running"}}
    query.sort(priority: 1, _id: 1).find_one_and_update(update, bypass_document_validation: true)
  end
end

#join(*_args) ⇒ Object



35
36
37
# File 'lib/rocket_job/worker.rb', line 35

def join(*_args)
  true
end

#killObject



39
40
41
# File 'lib/rocket_job/worker.rb', line 39

def kill
  true
end

#next_available_jobObject

Returns [RocketJob::Job] the next job available for processing. Returns [nil] if no job is available for processing.

Notes:

  • Destroys expired jobs

  • Runs job throttles and skips the job if it is throttled.

    • Adding that filter to the current filter to exclude from subsequent polling.



106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
# File 'lib/rocket_job/worker.rb', line 106

def next_available_job
  until shutdown?
    job = find_and_assign_job
    return unless job

    if job.expired?
      job.fail_on_exception! do
        job.worker_name = name
        job.destroy
        logger.info("Destroyed expired job.")
      end
      next
    end

    # Batch Job that is already started?
    # Batch has its own throttles for slices.
    return job if job.running?

    # Should this job be throttled?
    next if job.fail_on_exception! { throttled_job?(job) }
    # Job failed during throttle execution?
    next if job.failed?

    # Start this job!
    job.fail_on_exception! { job.start!(name) }
    return job if job.running?
  end
end

#random_wait_intervalObject

Returns [Float] a randomized poll interval in seconds up to the maximum configured poll interval.



178
179
180
# File 'lib/rocket_job/worker.rb', line 178

def random_wait_interval
  rand(Config.max_poll_seconds * 1000) / 1000
end

#reset_filter_if_expiredObject

Resets the current job filter if the relevant time interval has passed



90
91
92
93
94
95
96
97
# File 'lib/rocket_job/worker.rb', line 90

def reset_filter_if_expired
  # Only clear out the current_filter after every `re_check_seconds`
  time = Time.now
  return unless (time - @re_check_start) > Config.re_check_seconds

  @re_check_start     = time
  self.current_filter = Config.filter || {}
end

#runObject

Process jobs until it shuts down

Params

worker_id [Integer]
  The number of this worker for logging purposes


61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
# File 'lib/rocket_job/worker.rb', line 61

def run
  Thread.current.name = format("rocketjob %03i", id)
  logger.info "Started"

  until shutdown?
    sleep_seconds = Config.max_poll_seconds
    reset_filter_if_expired
    job = next_available_job

    # Returns true when work was completed, but no other work is available
    if job&.rocket_job_work(self, false)
      # Return the database connections for this thread back to the connection pool
      ActiveRecord::Base.clear_active_connections! if defined?(ActiveRecord::Base)

      # Stagger workers so that they don't all poll at the same time.
      sleep_seconds = random_wait_interval
    end

    wait_for_shutdown?(sleep_seconds)
  end

  logger.info "Stopping"
rescue Exception => e
  logger.fatal("Unhandled exception in job processing thread", e)
ensure
  ActiveRecord::Base.clear_active_connections! if defined?(ActiveRecord::Base)
end

#shutdown!Object



47
48
49
# File 'lib/rocket_job/worker.rb', line 47

def shutdown!
  true
end

#shutdown?Boolean

Returns:

  • (Boolean)


43
44
45
# File 'lib/rocket_job/worker.rb', line 43

def shutdown?
  false
end

#throttled_job?(job) ⇒ Boolean

Whether the supplied job has been throttled and should be ignored.

Returns:

  • (Boolean)


136
137
138
139
140
141
142
143
144
145
# File 'lib/rocket_job/worker.rb', line 136

def throttled_job?(job)
  # Evaluate job throttles, if any.
  filter = job.rocket_job_throttles.matching_filter(job)
  return false unless filter

  add_to_current_filter(filter)
  # Restore retrieved job so that other workers can process it later
  job.set(worker_name: nil, state: :queued)
  true
end

#wait_for_shutdown?(_timeout = nil) ⇒ Boolean

Returns [true|false] whether the shutdown indicator was set

Returns:

  • (Boolean)


52
53
54
# File 'lib/rocket_job/worker.rb', line 52

def wait_for_shutdown?(_timeout = nil)
  false
end