Module: QueueDispatcher::ActsAsTaskQueue::InstanceMethods
- Defined in:
- lib/queue_dispatcher/acts_as_task_queue.rb
Instance Method Summary collapse
- #acts_as_task_queue_tasks ⇒ Object
-
#all_done? ⇒ Boolean
Are all tasks executed?.
-
#brand_new? ⇒ Boolean
Return true, if the task_queue is in state new and is not older 30 seconds.
-
#destroy_if_all_done! ⇒ Object
Destroy the queue if it has no pending jobs.
-
#empty? ⇒ Boolean
Return true if there are no tasks in this taskqueue.
-
#kill ⇒ Object
Kill a task_queue.
-
#pending? ⇒ Boolean
Return true, if the task_queue has pending jobs and is running but no job is running.
-
#pending_tasks? ⇒ Boolean
Are there any running or pending tasks in the queue?.
-
#pid_running? ⇒ Boolean
Return true, if the command of the process with pid ‘self.pid’ is ‘ruby’.
-
#pop(args = {}) ⇒ Object
Get the next ready to run task out of the queue.
-
#push(task) ⇒ Object
Put a new task into the queue.
-
#remove_finished_tasks! ⇒ Object
Remove finished tasks from queue.
-
#run!(args = {}) ⇒ Object
Execute all tasks in the queue.
-
#running? ⇒ Boolean
Return true, if the task_queue is still running.
-
#task_states ⇒ Object
Returns the state of this task list (:stopped or :running).
-
#working? ⇒ Boolean
Return true, if the task_queue is working or has pending jobs.
Instance Method Details
#acts_as_task_queue_tasks ⇒ Object
86 87 88 |
# File 'lib/queue_dispatcher/acts_as_task_queue.rb', line 86 def acts_as_task_queue_tasks self.send(acts_as_task_queue_config.task_class_name.pluralize) end |
#all_done? ⇒ Boolean
Are all tasks executed?
198 199 200 |
# File 'lib/queue_dispatcher/acts_as_task_queue.rb', line 198 def all_done? ! pending_tasks? || empty? end |
#brand_new? ⇒ Boolean
Return true, if the task_queue is in state new and is not older 30 seconds
176 177 178 |
# File 'lib/queue_dispatcher/acts_as_task_queue.rb', line 176 def brand_new? state == 'new' && (Time.now - created_at) < 30.seconds end |
#destroy_if_all_done! ⇒ Object
Destroy the queue if it has no pending jobs
223 224 225 226 227 228 |
# File 'lib/queue_dispatcher/acts_as_task_queue.rb', line 223 def destroy_if_all_done! transaction do queue = TaskQueue.where(:id => self.id).lock(true).first queue.destroy if queue && queue.all_done? end end |
#empty? ⇒ Boolean
Return true if there are no tasks in this taskqueue
182 183 184 |
# File 'lib/queue_dispatcher/acts_as_task_queue.rb', line 182 def empty? acts_as_task_queue_tasks.empty? end |
#kill ⇒ Object
Kill a task_queue
217 218 219 |
# File 'lib/queue_dispatcher/acts_as_task_queue.rb', line 217 def kill Process.kill('HUP', pid) if pid end |
#pending? ⇒ Boolean
Return true, if the task_queue has pending jobs and is running but no job is running
210 211 212 213 |
# File 'lib/queue_dispatcher/acts_as_task_queue.rb', line 210 def pending? ts = task_states (ts == :new || ts == :pending || ts == :acquire_lock) && self.running? end |
#pending_tasks? ⇒ Boolean
Are there any running or pending tasks in the queue?
188 189 190 191 192 193 194 |
# File 'lib/queue_dispatcher/acts_as_task_queue.rb', line 188 def pending_tasks? transaction do queue = TaskQueue.where(:id => self.id).lock(true).first states = determine_state_of_task_array queue.acts_as_task_queue_tasks.lock(true) states[:running] || states[:pending] || states[:acquire_lock] || states[:init_queue] end end |
#pid_running? ⇒ Boolean
Return true, if the command of the process with pid ‘self.pid’ is ‘ruby’
158 159 160 161 162 163 164 165 166 |
# File 'lib/queue_dispatcher/acts_as_task_queue.rb', line 158 def pid_running? ps = self.pid ? Sys::ProcTable.ps(self.pid) : nil if ps # Asume, that if the command of the 'ps'-output is 'ruby', the process is still running ps.comm == 'ruby' else false end end |
#pop(args = {}) ⇒ Object
Get the next ready to run task out of the queue. Consider the priority and the dependent tasks, which is defined in the association defined on top of this model.
99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 |
# File 'lib/queue_dispatcher/acts_as_task_queue.rb', line 99 def pop args = {} task = nil transaction do # Find next pending task, where all dependent tasks are executed all_tasks = acts_as_task_queue_tasks.lock(true).all i = 0 while task.nil? && i < all_tasks.count do t = all_tasks[i] if t.dependent_tasks_executed? task = t if t.state == 'new' else log :msg => "Task #{t.id}: Waiting for dependent tasks #{t.dependent_tasks.map{|dt| dt.id}.join ','}...", :sev => :debug end i += 1 end # Remove task from current queue if task if args[:remove_task].nil? || args[:remove_task] task.update_attribute :task_queue_id, nil else task.update_attribute :state, 'new_popped' end end end task end |
#push(task) ⇒ Object
Put a new task into the queue
92 93 94 |
# File 'lib/queue_dispatcher/acts_as_task_queue.rb', line 92 def push task acts_as_task_queue_tasks << task end |
#remove_finished_tasks! ⇒ Object
Remove finished tasks from queue
232 233 234 235 236 |
# File 'lib/queue_dispatcher/acts_as_task_queue.rb', line 232 def remove_finished_tasks! trasnaction do tasks.each{ |t| t.update_attribute(:task_queue_id, nil) if t.executed? } end end |
#run!(args = {}) ⇒ Object
Execute all tasks in the queue
240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 |
# File 'lib/queue_dispatcher/acts_as_task_queue.rb', line 240 def run! args = {} task = nil @logger = args[:logger] || Logger.new("#{File.(Rails.root)}/log/task_queue.log") finish_state = 'aborted' task_queue = self print_log = args[:print_log] task_queue.update_attribute :state, 'running' # Set logger in engine @engine.logger = @logger if defined? @engine log :msg => "#{name}: Starting TaskQueue #{task_queue.id}...", :print_log => print_log # Init. Pop first task from queue, to show init_queue-state task = task_queue.pop(:remove_task => false) task.update_attribute :state, 'init_queue' if task init # Put task, which was used for showing the init_queue-state, back into the task_queue task.update_attributes :state => 'new', :task_queue_id => task_queue.id if task task_queue.reload # Ensure, that each task_queue is executed at least once, even if there are no tasks inside at the time it is started (this # can happen, if there are a lot of DB activities...) first_run = true # Loop as long as the task_queue exists with states 'running' and until the task_queue has pending tasks while task_queue && task_queue.state == 'running' && (task_queue.pending_tasks? || first_run) do first_run = false # Pop next task from queue task = task_queue.pop(:remove_task => (! acts_as_task_queue_config.leave_running_tasks_in_queue)) if task if task.new? # Start task.update_attributes :state => 'acquire_lock', :perc_finished => 0 get_lock_for task log :msg => "#{name}: Starting task #{task.id} (#{task.target.class.name}.#{task.method_name})...", :print_log => print_log task.update_attributes :state => 'running' # Execute the method defined in task.method if task.target.methods.include?(task.method_name) || task.target.methods.include?(task.method_name.to_sym) if task.dependent_tasks_had_errors error_msg = "Dependent tasks had errors!" log :msg => error_msg, :sev => :warn, :print_log => print_log rc_and_msg = QueueDispatcher::RcAndMsg.bad_rc error_msg else target = task.target target.logger = @logger if target.methods.include?(:logger=) || target.methods.include?('logger=') rc_and_msg = task.execute! end else error_msg = "unknown method '#{task.method_name}' for #{task.target.class.name}!" log :msg => error_msg, :sev => :warn, :print_log => print_log rc_and_msg = QueueDispatcher::RcAndMsg.bad_rc error_msg end # Change task state according to the return code and remove it from the queue task.update_state rc_and_msg cleanup_locks_after_error_for task task.update_attribute :task_queue_id, nil unless acts_as_task_queue_config.leave_finished_tasks_in_queue log :msg => "#{name}: Task #{task.id} (#{task.target.class.name}.#{task.method_name}) finished with state '#{task.state}'.", :print_log => print_log end else # We couldn't fetch a task out of the queue but there should still exists some. Maybe some are waiting for dependent tasks. # Sleep some time before trying it again. sleep acts_as_task_queue_config.poll_time end # Reload task_queue to get all updates task_queue = TaskQueue.find_by_id task_queue.id # If all tasks are finished, a config reload will be executed at the end of this method. To avoid too much config reloads, # wait some time before continuing. Maybe, some more tasks will added to the queue?! wait_time = 0 unless task_queue.nil? || task_queue.terminate_immediately until task_queue.nil? || task_queue.pending_tasks? || wait_time >= acts_as_task_queue_config.idle_wait_time || task_queue.state != 'running' do sleep acts_as_task_queue_config.poll_time wait_time += acts_as_task_queue_config.poll_time task_queue = TaskQueue.find_by_id task_queue.id end end # Reset logger since this got lost by reloading the task_queue task_queue.logger = @logger if task_queue end # Reload config if last task was not a config reload config_reload_required = cleanup_before_auto_reload if config_reload_required task_queue.update_attributes :state => 'reloading_config' if task_queue reload_config task, print_log: print_log end # Delete task_queue task_queue.destroy_if_all_done! if task_queue # Loop has ended log :msg => "#{name}: TaskQueue has ended!", :print_log => print_log finish_state = 'stopped' rescue => exception # Error handler backtrace = exception.backtrace.join("\n ") log :msg => "Fatal error in method 'run!': #{$!}\n #{backtrace}", :sev => :error, :print_log => print_log puts "Fatal error in method 'run!': #{$!}\n#{backtrace}" task.update_state QueueDispatcher::RcAndMsg.bad_rc("Fatal error: #{$!}") if task cleanup_locks_after_error_for task if task finish_state = 'error' ensure # Reload task and task_queue, to ensure the objects are up to date task_queue = TaskQueue.find_by_id task_queue.id if task_queue task = Task.find_by_id task.id if task # Update states of task and task_queue task.update_attributes :state => 'aborted' if task && task.state == 'running' task_queue.update_attributes :state => finish_state, :pid => nil if task_queue # Clean up deinit end |
#running? ⇒ Boolean
Return true, if the task_queue is still running
170 171 172 |
# File 'lib/queue_dispatcher/acts_as_task_queue.rb', line 170 def running? state == 'running' && pid_running? end |
#task_states ⇒ Object
Returns the state of this task list (:stopped or :running)
132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 |
# File 'lib/queue_dispatcher/acts_as_task_queue.rb', line 132 def task_states states = determine_state_of_task_array acts_as_task_queue_tasks if states[:empty] nil elsif states[:running] :running elsif states[:init_queue] :init_queue elsif states[:pending] :pending elsif states[:acquire_lock] :acquire_lock elsif states[:error] :error elsif states[:new] :new elsif states[:successful] :successful else :unknown end end |
#working? ⇒ Boolean
Return true, if the task_queue is working or has pending jobs
204 205 206 |
# File 'lib/queue_dispatcher/acts_as_task_queue.rb', line 204 def working? self.task_states == :running && self.running? end |