Module: QueueDispatcher::ActsAsTask::InstanceMethods

Defined in:
lib/queue_dispatcher/acts_as_task.rb

Instance Method Summary collapse

Instance Method Details

#aborted?Boolean

Was this task aborted?

Returns:

  • (Boolean)


165
166
167
# File 'lib/queue_dispatcher/acts_as_task.rb', line 165

def aborted?
  state == 'aborted'
end

#acquire_lock?Boolean

Is this task pending?

Returns:

  • (Boolean)


141
142
143
# File 'lib/queue_dispatcher/acts_as_task.rb', line 141

def acquire_lock?
  acts_as_task_task_queue && acts_as_task_task_queue.running? && state == 'acquire_lock'
end

#acts_as_task_task_queueObject



54
55
56
# File 'lib/queue_dispatcher/acts_as_task.rb', line 54

def acts_as_task_task_queue
  self.send(self.class.acts_as_task_config.task_queue_class_name)
end

#argsObject

Add task_id to the args



69
70
71
72
73
# File 'lib/queue_dispatcher/acts_as_task.rb', line 69

def args
  a = super
  a[-1] = a.last.merge(task_id: self.id) if a && a.instance_of?(Array) && a.last.instance_of?(Hash)
  a
end

#dependent_tasks_executed?Boolean

Are all dependent_tasks executed?

Returns:

  • (Boolean)


183
184
185
186
187
# File 'lib/queue_dispatcher/acts_as_task.rb', line 183

def dependent_tasks_executed?
  state = true
  dependent_tasks.each{ |dt| state = false unless dt.executed? }
  state
end

#dependent_tasks_had_errorsObject

Check recursive, if one or more of the tasks, which this task is dependent on had errors



191
192
193
194
195
196
197
# File 'lib/queue_dispatcher/acts_as_task.rb', line 191

def dependent_tasks_had_errors
  error = false
  dependent_tasks.each do |t|
    error = true if t.state == 'error' || t.dependent_tasks_had_errors
  end
  error
end

#error?Boolean

Had this task error(s)?

Returns:

  • (Boolean)


159
160
161
# File 'lib/queue_dispatcher/acts_as_task.rb', line 159

def error?
  state == 'error' || state.blank?
end

#execute!Object

Execute task



214
215
216
217
# File 'lib/queue_dispatcher/acts_as_task.rb', line 214

def execute!
  payload.task_id = id if payload.methods.include?(:task_id=)
  payload.send(method_name, *args)
end

#executed?Boolean

Was this task already executed?

Returns:

  • (Boolean)


177
178
179
# File 'lib/queue_dispatcher/acts_as_task.rb', line 177

def executed?
  successful? || error? || aborted?
end

#init_queue?Boolean

Is this task waiting until the queue is initialized?

Returns:

  • (Boolean)


171
172
173
# File 'lib/queue_dispatcher/acts_as_task.rb', line 171

def init_queue?
  state == 'init_queue'
end

#md5Object

Calculate md5-Checksum



206
207
208
209
210
# File 'lib/queue_dispatcher/acts_as_task.rb', line 206

def md5
  attr_str = ''
  Task.attribute_names.each{ |a| attr_str += self.send(a).to_s }
  Digest('MD5').digest(attr_str)
end

#new?Boolean

Is this task new?

Returns:

  • (Boolean)


123
124
125
# File 'lib/queue_dispatcher/acts_as_task.rb', line 123

def new?
   state == 'new' || state == 'new_popped'
end

#payloadObject



59
60
61
62
63
64
65
# File 'lib/queue_dispatcher/acts_as_task.rb', line 59

def payload
  if target.is_a?(QueueDispatcher::TargetContainer)
    target.payload
  else
    target
  end
end

#pending?Boolean

Is this task pending?

Returns:

  • (Boolean)


129
130
131
# File 'lib/queue_dispatcher/acts_as_task.rb', line 129

def pending?
  acts_as_task_task_queue && state == 'new'
end

#prosaObject

Placeholder. Please override it in your model.



201
202
# File 'lib/queue_dispatcher/acts_as_task.rb', line 201

def prosa
end

#reloading_config?Boolean

Is the task_queue in state config_reload?

Returns:

  • (Boolean)


135
136
137
# File 'lib/queue_dispatcher/acts_as_task.rb', line 135

def reloading_config?
  acts_as_task_task_queue && acts_as_task_task_queue.reloading_config? && acts_as_task_task_queue.tasks.where(state: 'new').first.id == id
end

#running?Boolean

Is this task running?

Returns:

  • (Boolean)


147
148
149
# File 'lib/queue_dispatcher/acts_as_task.rb', line 147

def running?
  acts_as_task_task_queue && acts_as_task_task_queue.running? && state == 'running'
end

#successful?Boolean

Was this task finsihed successful?

Returns:

  • (Boolean)


153
154
155
# File 'lib/queue_dispatcher/acts_as_task.rb', line 153

def successful?
  state == 'successful' || state == 'finished'
end

#update_message(args = {}) ⇒ Object

Update the attributes perc_finished and message according to the args



113
114
115
116
117
118
119
# File 'lib/queue_dispatcher/acts_as_task.rb', line 113

def update_message(args = {})
  msg = args[:msg]
  perc_finished = args[:perc_finished]

  self.update_attribute :message, msg if msg
  self.update_attribute :perc_finished, perc_finished if perc_finished
end

#update_state(result, remove_from_queue = false) ⇒ Object

This method updates the task state according to the return code of their corresponding command and removes it from the task_queue



77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
# File 'lib/queue_dispatcher/acts_as_task.rb', line 77

def update_state(result, remove_from_queue = false)
  rc = output = error_msg = nil

  if result.methods.map(&:to_sym).include?(:rc) && result.methods.map(&:to_sym).include?(:output) && result.methods.map(&:to_sym).include?(:error_msg)
    rc        = result.rc
    output    = result.output
    error_msg = result.error_msg
    result    = nil
  elsif result.kind_of?(Hash)
    rc        = result[:rc]
    output    = result[:output]
    error_msg = result[:error_msg]
    result    = nil
  end

  output ||= ''

  if rc.nil? || rc == 0
    self.update_attributes :state         => 'successful',
                           :perc_finished => 100,
                           :message       => output.truncate(10256),
                           :result        => result
  else
    self.update_attributes :state     => 'error',
                           :error_msg => error_msg,
                           :message   => output.truncate(10256),
                           :result    => result
  end

  self.update_attributes :task_queue_id => nil if remove_from_queue

  rc
end