Module: QueueDispatcher::ActsAsTaskQueue::InstanceMethods

Defined in:
lib/queue_dispatcher/acts_as_task_queue.rb

Instance Method Summary collapse

Instance Method Details

#acts_as_task_queue_tasksObject



109
110
111
# File 'lib/queue_dispatcher/acts_as_task_queue.rb', line 109

def acts_as_task_queue_tasks
  self.send(acts_as_task_queue_config.task_class_name.pluralize)
end

#all_done?Boolean

Are all tasks executed?

Returns:

  • (Boolean)


223
224
225
# File 'lib/queue_dispatcher/acts_as_task_queue.rb', line 223

def all_done?
  ! pending_tasks? || empty?
end

#brand_new?Boolean

Return true, if the task_queue is in state new and is not older 30 seconds

Returns:

  • (Boolean)


201
202
203
# File 'lib/queue_dispatcher/acts_as_task_queue.rb', line 201

def brand_new?
  state == 'new' && (Time.now - created_at) < 30.seconds
end

#destroy_if_all_done!Object

Destroy the queue if it has no pending jobs



254
255
256
257
258
259
# File 'lib/queue_dispatcher/acts_as_task_queue.rb', line 254

def destroy_if_all_done!
  transaction do
    queue = TaskQueue.where(:id => self.id).lock(true).first
    queue.destroy if queue && queue.all_done?
  end
end

#empty?Boolean

Return true if there are no tasks in this taskqueue

Returns:

  • (Boolean)


207
208
209
# File 'lib/queue_dispatcher/acts_as_task_queue.rb', line 207

def empty?
  acts_as_task_queue_tasks.empty?
end

#killObject

Kill a task_queue



248
249
250
# File 'lib/queue_dispatcher/acts_as_task_queue.rb', line 248

def kill
  Process.kill('HUP', pid) if pid
end

#pending?Boolean

Return true, if the task_queue has pending jobs and is running but no job is running

Returns:

  • (Boolean)


235
236
237
238
# File 'lib/queue_dispatcher/acts_as_task_queue.rb', line 235

def pending?
  ts = task_states
  (ts == :new || ts == :pending || ts == :acquire_lock) && self.running?
end

#pending_tasks?Boolean

Are there any running or pending tasks in the queue?

Returns:

  • (Boolean)


213
214
215
216
217
218
219
# File 'lib/queue_dispatcher/acts_as_task_queue.rb', line 213

def pending_tasks?
  transaction do
    queue = TaskQueue.where(:id => self.id).lock(true).first
    states = determine_state_of_task_array queue.acts_as_task_queue_tasks.lock(true)
    states[:running] || states[:pending] || states[:acquire_lock] || states[:init_queue]
  end
end

#pid_running?Boolean

Return true, if the command of the process with pid ‘self.pid’ is ‘ruby’

Returns:

  • (Boolean)


183
184
185
186
187
188
189
190
191
# File 'lib/queue_dispatcher/acts_as_task_queue.rb', line 183

def pid_running?
  ps = self.pid ? Sys::ProcTable.ps(self.pid) : nil
  if ps
    # Asume, that if the command of the 'ps'-output is 'ruby', the process is still running
    ps.comm == 'ruby'
  else
    false
  end
end

#pop(args = {}) ⇒ Object

Get the next ready to run task out of the queue. Consider the priority and the dependent tasks, which is defined in the association defined on top of this model.



122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
# File 'lib/queue_dispatcher/acts_as_task_queue.rb', line 122

def pop(args = {})
  task      = nil
  log_debug = acts_as_task_queue_config.debug

  transaction do
    # Find next pending task, where all dependent tasks are executed
    all_tasks = acts_as_task_queue_tasks.lock(true).all
    pos       = 0
    while task.nil? && pos < all_tasks.count do
      t = all_tasks[pos]
      if t.dependent_tasks_executed?
        task = t if t.state == 'new'
      else
        log :msg => "Task #{t.id}: Waiting for dependent tasks #{t.dependent_tasks.map{|dt| dt.id}.join ','}...", :sev => :debug if log_debug
      end
      pos += 1
    end

    # Remove task from current queue
    if task
      if args[:remove_task].nil? || args[:remove_task]
        task.update_attribute :task_queue_id, nil
      else
        task.update_attribute :state, 'new_popped'
      end
    end
  end

  task
end

#push(task) ⇒ Object

Put a new task into the queue



115
116
117
# File 'lib/queue_dispatcher/acts_as_task_queue.rb', line 115

def push(task)
  acts_as_task_queue_tasks << task
end

#reloading_config?Boolean

Return true, if the task_queue is in state ‘reloading_config’

Returns:

  • (Boolean)


242
243
244
# File 'lib/queue_dispatcher/acts_as_task_queue.rb', line 242

def reloading_config?
  pid_running? && state == 'reloading_config'
end

#remove_finished_tasks!Object

Remove finished tasks from queue



263
264
265
266
267
# File 'lib/queue_dispatcher/acts_as_task_queue.rb', line 263

def remove_finished_tasks!
  trasnaction do
    tasks.each{ |t| t.update_attribute(:task_queue_id, nil) if t.executed? }
  end
end

#run!(args = {}) ⇒ Object

Execute all tasks in the queue



271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
# File 'lib/queue_dispatcher/acts_as_task_queue.rb', line 271

def run!(args = {})
  task          = nil
  @logger       = args[:logger] || Logger.new("#{File.expand_path(Rails.root)}/log/task_queue.log")
  finish_state  = 'aborted'
  task_queue    = self
  print_log     = args[:print_log]

  task_queue.update_attribute :state, 'running'

  # Set logger in engine
  @engine.logger = @logger if defined? @engine && @engine.methods.include?(:logger=)
  log :msg => "#{name}: Starting TaskQueue #{task_queue.id}...", :print_log => print_log

  # Init. Pop first task from queue, to show init_queue-state
  task = task_queue.pop(:remove_task => false)
  task.update_attribute :state, 'init_queue' if task
  init

  # Put task, which was used for showing the init_queue-state, back into the task_queue
  task.update_attributes :state => 'new', :task_queue_id => task_queue.id if task
  task_queue.reload

  # Ensure, that each task_queue is executed at least once, even if there are no tasks inside at the time it is started (this
  # can happen, if there are a lot of DB activities...)
  first_run = true
  # Loop as long as the task_queue exists with states 'running' and until the task_queue has pending tasks
  while task_queue && task_queue.state == 'running' && (task_queue.pending_tasks? || first_run) do
    first_run = false

    # Pop next task from queue
    task = task_queue.pop(:remove_task => (! acts_as_task_queue_config.leave_running_tasks_in_queue))

    if task
      if task.new?
        # Start
        task.update_attributes :state => 'acquire_lock', :perc_finished => 0
        get_lock_for task
        log :msg => "#{name}: Starting task #{task.id} (#{task.target.class.name}.#{task.method_name})...", :print_log => print_log
        task.update_attributes :state => 'running'

        # Execute the method defined in task.method
        if task.target.methods.include?(task.method_name) || task.target.methods.include?(task.method_name.to_sym)
          if task.dependent_tasks_had_errors
            error_msg = 'Dependent tasks had errors!'
            log :msg => error_msg,
                :sev => :warn, 
                :print_log => print_log
            result = QueueDispatcher::RcAndMsg.bad_rc error_msg
          else
            target = task.target
            target.logger = @logger if target.methods.include?(:logger=) || target.methods.include?('logger=')
            result = task.execute!
          end
        else
          error_msg = "unknown method '#{task.method_name}' for #{task.target.class.name}!"
          log :msg => error_msg,
              :sev => :warn,
              :print_log => print_log
          result = QueueDispatcher::RcAndMsg.bad_rc error_msg
        end

        # Change task state according to the return code and remove it from the queue
        task.update_state result
        cleanup_locks_after_error_for task
        task.update_attribute :task_queue_id, nil unless acts_as_task_queue_config.leave_finished_tasks_in_queue
        log :msg => "#{name}: Task #{task.id} (#{task.target.class.name}.#{task.method_name}) finished with state '#{task.state}'.", :print_log => print_log

        # Wait between tasks
        sleep acts_as_task_queue_config.task_finish_wait_time
      end
    else
      # We couldn't fetch a task out of the queue but there should still exists some. Maybe some are waiting for dependent tasks.
      # Sleep some time before trying it again.
      sleep acts_as_task_queue_config.poll_time
    end

    # Interrupts
    handle_interrupts print_log: print_log

    # Reload task_queue to get all updates
    task_queue = TaskQueue.find_by_id task_queue.id

    # If all tasks are finished, a config reload will be executed at the end of this method. To avoid too much config reloads,
    # wait some time before continuing. Maybe, some more tasks will added to the queue?!
    wait_time = 0
    unless task_queue.nil? || task_queue.terminate_immediately
      until task_queue.nil? || task_queue.pending_tasks? || wait_time >= acts_as_task_queue_config.idle_wait_time || task_queue.state != 'running' do
        sleep acts_as_task_queue_config.poll_time
        wait_time += acts_as_task_queue_config.poll_time
        task_queue = TaskQueue.find_by_id task_queue.id
      end
    end

    # Reset logger since this got lost by reloading the task_queue
    task_queue.logger = @logger if task_queue
  end

  # Reload config if last task was not a config reload
  config_reload_required = cleanup_before_auto_reload
  if config_reload_required
    task_queue.update_attributes :state => 'reloading_config' if task_queue
    reload_config task, print_log: print_log
  end

  # Loop has ended
  log :msg => "#{name}: TaskQueue has ended!", :print_log => print_log
  finish_state = 'stopped'
rescue => exception
  # Error handler
  backtrace = exception.backtrace.join("\n  ")
  log :msg => "Fatal error in method 'run!': #{$!}\n  #{backtrace}", :sev => :error, :print_log => print_log
  puts "Fatal error in method 'run!': #{$!}\n#{backtrace}"
  task.update_state QueueDispatcher::RcAndMsg.bad_rc("Fatal error: #{$!}") if task
  cleanup_locks_after_error_for task if task
  task.update_attributes state: 'error' if task && task.state != 'finished'
ensure
  # Reload task and task_queue, to ensure the objects are up to date
  task_queue = TaskQueue.find_by_id task_queue.id if task_queue
  task       = Task.find_by_id task.id if task

  # Delete task_queue
  task_queue.destroy_if_all_done! if task_queue

  # Update states of task and task_queue
  task.update_attributes :state => 'aborted' if task && task.state == 'running'
  task_queue.update_attributes :state => finish_state, :pid   => nil if task_queue

  # Clean up
  deinit
end

#running?Boolean

Return true, if the task_queue is still running

Returns:

  • (Boolean)


195
196
197
# File 'lib/queue_dispatcher/acts_as_task_queue.rb', line 195

def running?
  state == 'running' && pid_running?
end

#task_statesObject

Returns the state of this task list (:stopped or :running)



155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
# File 'lib/queue_dispatcher/acts_as_task_queue.rb', line 155

def task_states
  states = determine_state_of_task_array acts_as_task_queue_tasks

  if states[:empty]
    nil
  elsif states[:running]
    :running
  elsif states[:init_queue]
    :init_queue
  elsif states[:pending]
    :pending
  elsif states[:acquire_lock]
    :acquire_lock
  elsif states[:error]
    :error
  elsif states[:aborted]
    :aborted
  elsif states[:new]
    :new
  elsif states[:successful]
    :successful
  else
    :unknown
  end
end

#working?Boolean

Return true, if the task_queue is working or has pending jobs

Returns:

  • (Boolean)


229
230
231
# File 'lib/queue_dispatcher/acts_as_task_queue.rb', line 229

def working?
  self.task_states == :running && self.running?
end