Module: QueueDispatcher::ActsAsTaskQueue::InstanceMethods

Defined in:
lib/queue_dispatcher/acts_as_task_queue.rb

Instance Method Summary collapse

Instance Method Details

#acts_as_task_queue_tasksObject



86
87
88
# File 'lib/queue_dispatcher/acts_as_task_queue.rb', line 86

def acts_as_task_queue_tasks
  self.send(acts_as_task_queue_config.task_class_name.pluralize)
end

#all_done?Boolean

Are all tasks executed?

Returns:

  • (Boolean)


198
199
200
# File 'lib/queue_dispatcher/acts_as_task_queue.rb', line 198

def all_done?
  ! pending_tasks? || empty?
end

#brand_new?Boolean

Return true, if the task_queue is in state new and is not older 30 seconds

Returns:

  • (Boolean)


176
177
178
# File 'lib/queue_dispatcher/acts_as_task_queue.rb', line 176

def brand_new?
  state == 'new' && (Time.now - created_at) < 30.seconds
end

#destroy_if_all_done!Object

Destroy the queue if it has no pending jobs



223
224
225
226
227
228
# File 'lib/queue_dispatcher/acts_as_task_queue.rb', line 223

def destroy_if_all_done!
  transaction do
    queue = TaskQueue.where(:id => self.id).lock(true).first
    queue.destroy if queue && queue.all_done?
  end
end

#empty?Boolean

Return true if there are no tasks in this taskqueue

Returns:

  • (Boolean)


182
183
184
# File 'lib/queue_dispatcher/acts_as_task_queue.rb', line 182

def empty?
  acts_as_task_queue_tasks.empty?
end

#killObject

Kill a task_queue



217
218
219
# File 'lib/queue_dispatcher/acts_as_task_queue.rb', line 217

def kill
  Process.kill('HUP', pid) if pid
end

#pending?Boolean

Return true, if the task_queue has pending jobs and is running but no job is running

Returns:

  • (Boolean)


210
211
212
213
# File 'lib/queue_dispatcher/acts_as_task_queue.rb', line 210

def pending?
  ts = task_states
  (ts == :new || ts == :pending || ts == :acquire_lock) && self.running?
end

#pending_tasks?Boolean

Are there any running or pending tasks in the queue?

Returns:

  • (Boolean)


188
189
190
191
192
193
194
# File 'lib/queue_dispatcher/acts_as_task_queue.rb', line 188

def pending_tasks?
  transaction do
    queue = TaskQueue.where(:id => self.id).lock(true).first
    states = determine_state_of_task_array queue.acts_as_task_queue_tasks.lock(true)
    states[:running] || states[:pending] || states[:acquire_lock] || states[:init_queue]
  end
end

#pid_running?Boolean

Return true, if the command of the process with pid ‘self.pid’ is ‘ruby’

Returns:

  • (Boolean)


158
159
160
161
162
163
164
165
166
# File 'lib/queue_dispatcher/acts_as_task_queue.rb', line 158

def pid_running?
  ps = self.pid ? Sys::ProcTable.ps(self.pid) : nil
  if ps
    # Asume, that if the command of the 'ps'-output is 'ruby', the process is still running
    ps.comm == 'ruby'
  else
    false
  end
end

#pop(args = {}) ⇒ Object

Get the next ready to run task out of the queue. Consider the priority and the dependent tasks, which is defined in the association defined on top of this model.



99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
# File 'lib/queue_dispatcher/acts_as_task_queue.rb', line 99

def pop args = {}
  task                = nil

  transaction do
    # Find next pending task, where all dependent tasks are executed
    all_tasks = acts_as_task_queue_tasks.lock(true).all
    i         = 0
    while task.nil? && i < all_tasks.count do
      t = all_tasks[i]
      if t.dependent_tasks_executed?
        task = t if t.state == 'new'
      else
        log :msg => "Task #{t.id}: Waiting for dependent tasks #{t.dependent_tasks.map{|dt| dt.id}.join ','}...",
            :sev => :debug
      end
      i += 1
    end

    # Remove task from current queue
    if task
      if args[:remove_task].nil? || args[:remove_task]
        task.update_attribute :task_queue_id, nil
      else
        task.update_attribute :state, 'new_popped'
      end
    end
  end

  task
end

#push(task) ⇒ Object

Put a new task into the queue



92
93
94
# File 'lib/queue_dispatcher/acts_as_task_queue.rb', line 92

def push task
  acts_as_task_queue_tasks << task
end

#remove_finished_tasks!Object

Remove finished tasks from queue



232
233
234
235
236
# File 'lib/queue_dispatcher/acts_as_task_queue.rb', line 232

def remove_finished_tasks!
  trasnaction do
    tasks.each{ |t| t.update_attribute(:task_queue_id, nil) if t.executed? }
  end
end

#run!(args = {}) ⇒ Object

Execute all tasks in the queue



240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
# File 'lib/queue_dispatcher/acts_as_task_queue.rb', line 240

def run! args = {}
  task          = nil
  @logger       = args[:logger] || Logger.new("#{File.expand_path(Rails.root)}/log/task_queue.log")
  finish_state  = 'aborted'
  task_queue    = self
  print_log     = args[:print_log]

  task_queue.update_attribute :state, 'running'

  # Set logger in engine
  @engine.logger = @logger if defined? @engine
  log :msg => "#{name}: Starting TaskQueue #{task_queue.id}...", :print_log => print_log

  # Init. Pop first task from queue, to show init_queue-state
  task = task_queue.pop(:remove_task => false)
  task.update_attribute :state, 'init_queue' if task
  init

  # Put task, which was used for showing the init_queue-state, back into the task_queue
  task.update_attributes :state => 'new', :task_queue_id => task_queue.id if task
  task_queue.reload

  # Ensure, that each task_queue is executed at least once, even if there are no tasks inside at the time it is started (this
  # can happen, if there are a lot of DB activities...)
  first_run = true
  # Loop as long as the task_queue exists with states 'running' and until the task_queue has pending tasks
  while task_queue && task_queue.state == 'running' && (task_queue.pending_tasks? || first_run) do
    first_run = false

    # Pop next task from queue
    task = task_queue.pop(:remove_task => (! acts_as_task_queue_config.leave_running_tasks_in_queue))

    if task
      if task.new?
        # Start
        task.update_attributes :state => 'acquire_lock', :perc_finished => 0
        get_lock_for task
        log :msg => "#{name}: Starting task #{task.id} (#{task.target.class.name}.#{task.method_name})...", :print_log => print_log
        task.update_attributes :state => 'running'

        # Execute the method defined in task.method
        if task.target.methods.include?(task.method_name) || task.target.methods.include?(task.method_name.to_sym)
          if task.dependent_tasks_had_errors
            error_msg = "Dependent tasks had errors!"
            log :msg => error_msg,
                :sev => :warn, 
                :print_log => print_log
            rc_and_msg = QueueDispatcher::RcAndMsg.bad_rc error_msg
          else
            target = task.target
            target.logger = @logger if target.methods.include?(:logger=) || target.methods.include?('logger=')
            rc_and_msg = task.execute!
          end
        else
          error_msg = "unknown method '#{task.method_name}' for #{task.target.class.name}!"
          log :msg => error_msg,
              :sev => :warn,
              :print_log => print_log
          rc_and_msg = QueueDispatcher::RcAndMsg.bad_rc error_msg
        end

        # Change task state according to the return code and remove it from the queue
        task.update_state rc_and_msg
        cleanup_locks_after_error_for task
        task.update_attribute :task_queue_id, nil unless acts_as_task_queue_config.leave_finished_tasks_in_queue
        log :msg => "#{name}: Task #{task.id} (#{task.target.class.name}.#{task.method_name}) finished with state '#{task.state}'.", :print_log => print_log
      end
    else
      # We couldn't fetch a task out of the queue but there should still exists some. Maybe some are waiting for dependent tasks.
      # Sleep some time before trying it again.
      sleep acts_as_task_queue_config.poll_time
    end

    # Reload task_queue to get all updates
    task_queue = TaskQueue.find_by_id task_queue.id

    # If all tasks are finished, a config reload will be executed at the end of this method. To avoid too much config reloads,
    # wait some time before continuing. Maybe, some more tasks will added to the queue?!
    wait_time = 0
    unless task_queue.nil? || task_queue.terminate_immediately
      until task_queue.nil? || task_queue.pending_tasks? || wait_time >= acts_as_task_queue_config.idle_wait_time || task_queue.state != 'running' do
        sleep acts_as_task_queue_config.poll_time
        wait_time += acts_as_task_queue_config.poll_time
        task_queue = TaskQueue.find_by_id task_queue.id
      end
    end

    # Reset logger since this got lost by reloading the task_queue
    task_queue.logger = @logger if task_queue
  end

  # Reload config if last task was not a config reload
  config_reload_required = cleanup_before_auto_reload
  if config_reload_required
    task_queue.update_attributes :state => 'reloading_config' if task_queue
    reload_config task, print_log: print_log
  end

  # Delete task_queue
  task_queue.destroy_if_all_done! if task_queue

  # Loop has ended
  log :msg => "#{name}: TaskQueue has ended!", :print_log => print_log
  finish_state = 'stopped'
rescue => exception
  # Error handler
  backtrace = exception.backtrace.join("\n  ")
  log :msg => "Fatal error in method 'run!': #{$!}\n  #{backtrace}", :sev => :error, :print_log => print_log
  puts "Fatal error in method 'run!': #{$!}\n#{backtrace}"
  task.update_state QueueDispatcher::RcAndMsg.bad_rc("Fatal error: #{$!}") if task
  cleanup_locks_after_error_for task if task
  finish_state = 'error'
ensure
  # Reload task and task_queue, to ensure the objects are up to date
  task_queue = TaskQueue.find_by_id task_queue.id if task_queue
  task       = Task.find_by_id task.id if task

  # Update states of task and task_queue
  task.update_attributes :state => 'aborted' if task && task.state == 'running'
  task_queue.update_attributes :state => finish_state, :pid   => nil if task_queue

  # Clean up
  deinit
end

#running?Boolean

Return true, if the task_queue is still running

Returns:

  • (Boolean)


170
171
172
# File 'lib/queue_dispatcher/acts_as_task_queue.rb', line 170

def running?
  state == 'running' && pid_running?
end

#task_statesObject

Returns the state of this task list (:stopped or :running)



132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
# File 'lib/queue_dispatcher/acts_as_task_queue.rb', line 132

def task_states
  states = determine_state_of_task_array acts_as_task_queue_tasks

  if states[:empty]
    nil
  elsif states[:running]
    :running
  elsif states[:init_queue]
    :init_queue
  elsif states[:pending]
    :pending
  elsif states[:acquire_lock]
    :acquire_lock
  elsif states[:error]
    :error
  elsif states[:new]
    :new
  elsif states[:successful]
    :successful
  else
    :unknown
  end
end

#working?Boolean

Return true, if the task_queue is working or has pending jobs

Returns:

  • (Boolean)


204
205
206
# File 'lib/queue_dispatcher/acts_as_task_queue.rb', line 204

def working?
  self.task_states == :running && self.running?
end