Module: QueueDispatcher::ActsAsTask::InstanceMethods

Defined in:
lib/queue_dispatcher/acts_as_task.rb

Instance Method Summary collapse

Instance Method Details

#aborted?Boolean

Was this task aborted?

Returns:

  • (Boolean)


185
186
187
# File 'lib/queue_dispatcher/acts_as_task.rb', line 185

def aborted?
  state == 'aborted'
end

#acquire_lock?Boolean

Is this task pending?

Returns:

  • (Boolean)


161
162
163
# File 'lib/queue_dispatcher/acts_as_task.rb', line 161

def acquire_lock?
  acts_as_task_task_queue && acts_as_task_task_queue.running? && state == 'acquire_lock'
end

#acts_as_task_task_queueObject



70
71
72
# File 'lib/queue_dispatcher/acts_as_task.rb', line 70

def acts_as_task_task_queue
  self.send(self.class.acts_as_task_config.task_queue_class_name)
end

#argsObject

Add task_id to the args



85
86
87
88
89
# File 'lib/queue_dispatcher/acts_as_task.rb', line 85

def args
  a = super
  a[-1] = a.last.merge(task_id: self.id) if a && a.instance_of?(Array) && a.last.instance_of?(Hash)
  a
end

#dependent_tasks_executed?Boolean

Are all dependent_tasks executed?

Returns:

  • (Boolean)


203
204
205
206
207
# File 'lib/queue_dispatcher/acts_as_task.rb', line 203

def dependent_tasks_executed?
  state = true
  dependent_tasks.each{ |dt| state = false unless dt.executed? }
  state
end

#dependent_tasks_had_errorsObject

Check recursive, if one or more of the tasks, which this task is dependent on had errors



211
212
213
214
215
216
217
# File 'lib/queue_dispatcher/acts_as_task.rb', line 211

def dependent_tasks_had_errors
  error = false
  dependent_tasks.each do |t|
    error = true if t.state == 'error' || t.dependent_tasks_had_errors
  end
  error
end

#error?Boolean

Had this task error(s)?

Returns:

  • (Boolean)


179
180
181
# File 'lib/queue_dispatcher/acts_as_task.rb', line 179

def error?
  state == 'error' || state.blank?
end

#execute!Object

Execute task



234
235
236
237
# File 'lib/queue_dispatcher/acts_as_task.rb', line 234

def execute!
  payload.task_id = id if payload.methods.include?(:task_id=)
  payload.send(method_name, *args)
end

#executed?Boolean

Was this task already executed?

Returns:

  • (Boolean)


197
198
199
# File 'lib/queue_dispatcher/acts_as_task.rb', line 197

def executed?
  successful? || error? || aborted?
end

#init_queue?Boolean

Is this task waiting until the queue is initialized?

Returns:

  • (Boolean)


191
192
193
# File 'lib/queue_dispatcher/acts_as_task.rb', line 191

def init_queue?
  state == 'init_queue'
end

#md5Object

Calculate md5-Checksum



226
227
228
229
230
# File 'lib/queue_dispatcher/acts_as_task.rb', line 226

def md5
  attr_str = ''
  Task.attribute_names.each{ |a| attr_str += self.send(a).to_s }
  Digest('MD5').digest(attr_str)
end

#new?Boolean

Is this task new?

Returns:

  • (Boolean)


143
144
145
# File 'lib/queue_dispatcher/acts_as_task.rb', line 143

def new?
   state == 'new' || state == 'new_popped'
end

#payloadObject



75
76
77
78
79
80
81
# File 'lib/queue_dispatcher/acts_as_task.rb', line 75

def payload
  if target.is_a?(QueueDispatcher::TargetContainer)
    target.payload
  else
    target
  end
end

#pending?Boolean

Is this task pending?

Returns:

  • (Boolean)


149
150
151
# File 'lib/queue_dispatcher/acts_as_task.rb', line 149

def pending?
  acts_as_task_task_queue && state == 'new'
end

#prosaObject

Placeholder. Please override it in your model.



221
222
# File 'lib/queue_dispatcher/acts_as_task.rb', line 221

def prosa
end

#reloading_config?Boolean

Is the task_queue in state config_reload?

Returns:

  • (Boolean)


155
156
157
# File 'lib/queue_dispatcher/acts_as_task.rb', line 155

def reloading_config?
  acts_as_task_task_queue && acts_as_task_task_queue.reloading_config? && acts_as_task_task_queue.tasks.where(state: 'new').first.id == id
end

#running?Boolean

Is this task running?

Returns:

  • (Boolean)


167
168
169
# File 'lib/queue_dispatcher/acts_as_task.rb', line 167

def running?
  acts_as_task_task_queue && acts_as_task_task_queue.running? && state == 'running'
end

#successful?Boolean

Was this task finsihed successful?

Returns:

  • (Boolean)


173
174
175
# File 'lib/queue_dispatcher/acts_as_task.rb', line 173

def successful?
  state == 'successful' || state == 'finished'
end

#update_message(args = {}) ⇒ Object

Update the attributes perc_finished and message according to the args



133
134
135
136
137
138
139
# File 'lib/queue_dispatcher/acts_as_task.rb', line 133

def update_message(args = {})
  msg = args[:msg]
  perc_finished = args[:perc_finished]

  self.update_attribute :message, msg if msg
  self.update_attribute :perc_finished, perc_finished if perc_finished
end

#update_state_and_exec_callbacks(result, remove_from_queue = false, logger = nil) ⇒ Object

This method updates the task state according to the return code of their corresponding command and removes it from the task_queue



93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
# File 'lib/queue_dispatcher/acts_as_task.rb', line 93

def update_state_and_exec_callbacks(result, remove_from_queue = false, logger = nil)
  rc = output = error_msg = nil

  if result.methods.map(&:to_sym).include?(:rc) && result.methods.map(&:to_sym).include?(:output) && result.methods.map(&:to_sym).include?(:error_msg)
    rc        = result.rc
    output    = result.output
    error_msg = result.error_msg
    result    = nil
  elsif result.kind_of?(Hash)
    rc        = result[:rc]
    output    = result[:output]
    error_msg = result[:error_msg]
    result    = nil
  end

  output ||= ''
  successful = result.methods.map(&:to_sym).include?(:successful?) ? result.successful? : rc.nil? || rc == 0

  if successful
    self.update_attributes :state         => 'successful',
                           :perc_finished => 100,
                           :message       => output.truncate(10256),
                           :result        => result
    success_callbacks(logger)
  else
    self.update_attributes :state     => 'error',
                           :error_msg => error_msg,
                           :message   => output.truncate(10256),
                           :result    => result
    error_callbacks(logger)
  end

  self.update_attributes :task_queue_id => nil if remove_from_queue


  rc
end