Module: QueueDispatcher::ActsAsTaskQueue::SingletonMethods

Defined in:
lib/queue_dispatcher/acts_as_task_queue.rb

Instance Method Summary collapse

Instance Method Details

#acts_as_task_queue_configObject



50
51
52
# File 'lib/queue_dispatcher/acts_as_task_queue.rb', line 50

def acts_as_task_queue_config
  @acts_as_task_queue_config
end

#any_running?Boolean

Are there any running task_queues?

Returns:

  • (Boolean)


76
77
78
79
80
# File 'lib/queue_dispatcher/acts_as_task_queue.rb', line 76

def any_running?
  running = false
  all.each{ |tq| running = true if tq.running? || tq.brand_new? }
  running
end

#find_or_create_by_name(name, options = {}) ⇒ Object

Find or create a task_queue by its name which is not in state ‘error’. Create one, if there does not exists one



100
101
102
103
104
# File 'lib/queue_dispatcher/acts_as_task_queue.rb', line 100

def find_or_create_by_name(name, options = {})
  transaction do
    self.where(:name => name).where("state != 'error'").first || self.create(:name => name, :state => 'new', terminate_immediately: options[:terminate_immediately])
  end
end

#get_next_pendingObject

Get next pending task_queue



84
85
86
87
88
89
90
91
92
93
94
95
96
# File 'lib/queue_dispatcher/acts_as_task_queue.rb', line 84

def get_next_pending
  task_queue = nil

  transaction do
    # Find next task_queue which is not running and not in state error
    order(:id).lock(true).all.each { |tq| task_queue = tq unless task_queue || tq.pid_running? || tq.state == 'error' || tq.state == 'heartbeat' }

    # Update pid inside the atomic transaction to be sure, the next call of this method will not give the same queue a second time
    task_queue.update_attribute :pid, $$ if task_queue
  end

  task_queue
end

#pid_running?(pid) ⇒ Boolean

Check if a certain PID is still running and is a ruby process

Returns:

  • (Boolean)


56
57
58
59
60
61
62
63
64
# File 'lib/queue_dispatcher/acts_as_task_queue.rb', line 56

def pid_running?(pid)
  ps = pid ? Sys::ProcTable.ps(pid) : nil
  if ps
    # Asume, that if the command of the 'ps'-output is 'ruby', the process is still running
    ps.comm == 'ruby'
  else
    false
  end
end

#qd_running?Boolean

Check if QueueDispatcher is running.

Returns:

  • (Boolean)


68
69
70
71
72
# File 'lib/queue_dispatcher/acts_as_task_queue.rb', line 68

def qd_running?
  running = false
  TaskQueue.where(state: 'heartbeat').each { |tq| running = true if tq.updated_at > 1.minute.ago }
  running
end

#reset_immediately!Object

Kill all running TaskQueues immediately and destroy them.



108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
# File 'lib/queue_dispatcher/acts_as_task_queue.rb', line 108

def reset_immediately!
  all.each do |tq|
    tq.update_attributes state: 'aborted'

    # Kill the TaskQueue with SIGKILL
    Process.kill 'KILL', tq.pid if tq.pid_running?

    # Update task_state to aborted and release all its locks
    tq.tasks.each do |task|
      task.update_attributes state: 'aborted' unless task.state == 'successful' || task.state == 'finished'
      tq.send(:release_lock_for, task)
    end
    tq.destroy
  end
end