Module: QueueDispatcher::ActsAsTaskQueue::InstanceMethods

Defined in:
lib/queue_dispatcher/acts_as_task_queue.rb

Instance Method Summary collapse

Instance Method Details

#acts_as_task_queue_tasksObject



127
128
129
# File 'lib/queue_dispatcher/acts_as_task_queue.rb', line 127

def acts_as_task_queue_tasks
  self.send(acts_as_task_queue_config.task_class_name.pluralize)
end

#all_done?Boolean

Are all tasks executed?

Returns:

  • (Boolean)


235
236
237
# File 'lib/queue_dispatcher/acts_as_task_queue.rb', line 235

def all_done?
  ! pending_tasks? || empty?
end

#brand_new?Boolean

Return true, if the task_queue is in state new and is not older 30 seconds

Returns:

  • (Boolean)


213
214
215
# File 'lib/queue_dispatcher/acts_as_task_queue.rb', line 213

def brand_new?
  state == 'new' && (Time.now - created_at) < 30.seconds
end

#destroy_if_all_done!Object

Destroy the queue if it has no pending jobs



266
267
268
269
270
271
# File 'lib/queue_dispatcher/acts_as_task_queue.rb', line 266

def destroy_if_all_done!
  transaction do
    queue = TaskQueue.where(:id => self.id).lock(true).first
    queue.destroy if queue && queue.all_done?
  end
end

#empty?Boolean

Return true if there are no tasks in this taskqueue

Returns:

  • (Boolean)


219
220
221
# File 'lib/queue_dispatcher/acts_as_task_queue.rb', line 219

def empty?
  acts_as_task_queue_tasks.empty?
end

#killObject

Kill a task_queue



260
261
262
# File 'lib/queue_dispatcher/acts_as_task_queue.rb', line 260

def kill
  Process.kill('HUP', pid) if pid
end

#pending?Boolean

Return true, if the task_queue has pending jobs and is running but no job is running

Returns:

  • (Boolean)


247
248
249
250
# File 'lib/queue_dispatcher/acts_as_task_queue.rb', line 247

def pending?
  ts = task_states
  (ts == :new || ts == :pending || ts == :acquire_lock) && self.running?
end

#pending_tasks?Boolean

Are there any running or pending tasks in the queue?

Returns:

  • (Boolean)


225
226
227
228
229
230
231
# File 'lib/queue_dispatcher/acts_as_task_queue.rb', line 225

def pending_tasks?
  transaction do
    queue = TaskQueue.where(:id => self.id).lock(true).first
    states = determine_state_of_task_array queue.acts_as_task_queue_tasks.lock(true)
    states[:running] || states[:pending] || states[:acquire_lock] || states[:init_queue]
  end
end

#pid_running?Boolean

Return true, if the command of the process with pid ‘self.pid’ is ‘ruby’

Returns:

  • (Boolean)


201
202
203
# File 'lib/queue_dispatcher/acts_as_task_queue.rb', line 201

def pid_running?
  self.class.pid_running?(self.pid)
end

#pop(args = {}) ⇒ Object

Get the next ready to run task out of the queue. Consider the priority and the dependent tasks, which is defined in the association defined on top of this model.



140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
# File 'lib/queue_dispatcher/acts_as_task_queue.rb', line 140

def pop(args = {})
  task      = nil
  log_debug = acts_as_task_queue_config.debug

  transaction do
    # Find next pending task, where all dependent tasks are executed
    all_tasks = acts_as_task_queue_tasks.lock(true).all
    pos       = 0
    while task.nil? && pos < all_tasks.to_a.count do
      t = all_tasks[pos]
      if t.dependent_tasks_executed?
        task = t if t.state == 'new'
      else
        log :msg => "Task #{t.id}: Waiting for dependent tasks #{t.dependent_tasks.map{|dt| dt.id}.join ','}...", :sev => :debug if log_debug
      end
      pos += 1
    end

    # Remove task from current queue
    if task
      if args[:remove_task].nil? || args[:remove_task]
        task.update_attribute :task_queue_id, nil
      else
        task.update_attribute :state, 'new_popped'
      end
    end
  end

  task
end

#push(task) ⇒ Object

Put a new task into the queue



133
134
135
# File 'lib/queue_dispatcher/acts_as_task_queue.rb', line 133

def push(task)
  acts_as_task_queue_tasks << task
end

#reloading_config?Boolean

Return true, if the task_queue is in state ‘reloading_config’

Returns:

  • (Boolean)


254
255
256
# File 'lib/queue_dispatcher/acts_as_task_queue.rb', line 254

def reloading_config?
  pid_running? && state == 'reloading_config'
end

#remove_finished_tasks!Object

Remove finished tasks from queue



275
276
277
278
279
# File 'lib/queue_dispatcher/acts_as_task_queue.rb', line 275

def remove_finished_tasks!
  trasnaction do
    tasks.each{ |t| t.update_attribute(:task_queue_id, nil) if t.executed? }
  end
end

#run!(args = {}) ⇒ Object

Execute all tasks in the queue



283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
# File 'lib/queue_dispatcher/acts_as_task_queue.rb', line 283

def run!(args = {})
  task          = nil
  @logger       = args[:logger] || Logger.new("#{File.expand_path(Rails.root)}/log/task_queue.log")
  finish_state  = 'aborted'
  task_queue    = self
  print_log     = args[:print_log]

  task_queue.update_attribute :state, 'running'

  # Set logger in engine
  @engine.logger = @logger if defined?(@engine) && @engine.methods.include?(:logger=)
  log :msg => "#{name}: Starting TaskQueue #{task_queue.id}...", :print_log => print_log

  # Init. Pop first task from queue, to show init_queue-state
  task = task_queue.pop(:remove_task => false)
  task.update_attribute :state, 'init_queue' if task
  init

  # Put task, which was used for showing the init_queue-state, back into the task_queue
  task.update_attributes :state => 'new', :task_queue_id => task_queue.id if task
  task_queue.reload

  # Ensure, that each task_queue is executed at least once, even if there are no tasks inside at the time it is started (this
  # can happen, if there are a lot of DB activities...)
  first_run = true
  # Loop as long as the task_queue exists with states 'running' and until the task_queue has pending tasks
  while task_queue && task_queue.state == 'running' && (task_queue.pending_tasks? || first_run) do
    first_run = false

    # Pop next task from queue
    task = task_queue.pop(:remove_task => (! acts_as_task_queue_config.leave_running_tasks_in_queue))

    if task
      if task.new?
        # Start
        task.update_attributes :state => 'acquire_lock', :perc_finished => 0
        get_lock_for task
        log :msg => "#{name}: Starting task #{task.id} (#{task.payload.class.name}.#{task.method_name})...", :print_log => print_log
        task.update_attributes :state => 'running'

        # Execute the method defined in task.method
        if task.payload.methods.map(&:to_sym).include?(task.method_name.to_sym)
          if task.dependent_tasks_had_errors
            error_msg = 'Dependent tasks had errors!'
            log :msg => error_msg,
                :sev => :warn, 
                :print_log => print_log
            result = QueueDispatcher::RcAndMsg.bad_rc error_msg
          else
            payload = task.payload
            payload.logger = @logger if payload.methods.include?(:logger=) || payload.methods.include?('logger=')
            result = task.execute!
          end
        else
          error_msg = "unknown method '#{task.method_name}' for #{task.payload.class.name}!"
          log :msg => error_msg,
              :sev => :warn,
              :print_log => print_log
          result = QueueDispatcher::RcAndMsg.bad_rc error_msg
        end

        # Change task state according to the return code and remove it from the queue
        task.update_state_and_exec_callbacks(result, false, logger)
        cleanup_locks_after_error_for task
        task.update_attribute :task_queue_id, nil unless acts_as_task_queue_config.leave_finished_tasks_in_queue
        log :msg => "#{name}: Task #{task.id} (#{task.payload.class.name}.#{task.method_name}) finished with state '#{task.state}'.", :print_log => print_log

        # Wait between tasks
        sleep acts_as_task_queue_config.task_finish_wait_time
      end
    else
      # We couldn't fetch a task out of the queue but there should still exists some. Maybe some are waiting for dependent tasks.
      # Sleep some time before trying it again.
      sleep acts_as_task_queue_config.poll_time
    end

    # Interrupts
    handle_interrupts print_log: print_log

    # Reload task_queue to get all updates
    task_queue = TaskQueue.find_by_id task_queue.id

    # If all tasks are finished, a config reload will be executed at the end of this method. To avoid too much config reloads,
    # wait some time before continuing. Maybe, some more tasks will added to the queue?!
    wait_time = 0
    unless task_queue.nil? || task_queue.terminate_immediately
      until task_queue.nil? || task_queue.pending_tasks? || wait_time >= acts_as_task_queue_config.idle_wait_time || task_queue.state != 'running' do
        sleep acts_as_task_queue_config.poll_time
        wait_time += acts_as_task_queue_config.poll_time
        task_queue = TaskQueue.find_by_id task_queue.id
      end
    end

    # Reset logger since this got lost by reloading the task_queue
    task_queue.logger = @logger if task_queue
  end

  # Reload config if last task was not a config reload
  config_reload_required = cleanup_before_auto_reload
  if config_reload_required
    task_queue.update_attributes :state => 'reloading_config' if task_queue
    reload_config task, print_log: print_log
  end

  # Loop has ended
  log :msg => "#{name}: TaskQueue has ended!", :print_log => print_log
  finish_state = 'stopped'
rescue => exception
  # Error handler
  backtrace = exception.backtrace.join("\n  ")
  log :msg => "Fatal error in method 'run!': #{$!}\n  #{backtrace}", :sev => :error, :print_log => print_log
  puts "Fatal error in method 'run!': #{$!}\n#{backtrace}"
  task.update_state_and_exec_callbacks(QueueDispatcher::RcAndMsg.bad_rc("Fatal error: #{$!}"), false, logger) if task
  cleanup_locks_after_error_for task if task
  task.update_attributes state: 'error' if task && task.state != 'finished'
ensure
  # Reload task and task_queue, to ensure the objects are up to date
  task_queue = TaskQueue.find_by_id task_queue.id if task_queue
  task       = Task.find_by_id task.id if task

  # Delete task_queue
  task_queue.destroy_if_all_done! if task_queue

  # Update states of task and task_queue
  task.update_attributes :state => 'aborted' if task && task.state == 'running'
  task_queue.update_attributes :state => finish_state, :pid   => nil if task_queue

  # Clean up
  deinit
end

#running?Boolean

Return true, if the task_queue is still running

Returns:

  • (Boolean)


207
208
209
# File 'lib/queue_dispatcher/acts_as_task_queue.rb', line 207

def running?
  state == 'running' && pid_running?
end

#task_statesObject

Returns the state of this task list (:stopped or :running)



173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
# File 'lib/queue_dispatcher/acts_as_task_queue.rb', line 173

def task_states
  states = determine_state_of_task_array acts_as_task_queue_tasks

  if states[:empty]
    nil
  elsif states[:running]
    :running
  elsif states[:init_queue]
    :init_queue
  elsif states[:pending]
    :pending
  elsif states[:acquire_lock]
    :acquire_lock
  elsif states[:error]
    :error
  elsif states[:aborted]
    :aborted
  elsif states[:new]
    :new
  elsif states[:successful]
    :successful
  else
    :unknown
  end
end

#working?Boolean

Return true, if the task_queue is working or has pending jobs

Returns:

  • (Boolean)


241
242
243
# File 'lib/queue_dispatcher/acts_as_task_queue.rb', line 241

def working?
  self.task_states == :running && self.running?
end