Module: QueueDispatcher::ActsAsTask::InstanceMethods
- Defined in:
- lib/queue_dispatcher/acts_as_task.rb
Instance Method Summary collapse
-
#aborted? ⇒ Boolean
Was this task aborted?.
-
#acquire_lock? ⇒ Boolean
Is this task pending?.
- #acts_as_task_task_queue ⇒ Object
-
#args ⇒ Object
Add task_id to the args.
-
#dependent_tasks_executed? ⇒ Boolean
Are all dependent_tasks executed?.
-
#dependent_tasks_had_errors ⇒ Object
Check recursive, if one or more of the tasks, which this task is dependent on had errors.
-
#error? ⇒ Boolean
Had this task error(s)?.
-
#execute! ⇒ Object
Execute task.
-
#executed? ⇒ Boolean
Was this task already executed?.
-
#init_queue? ⇒ Boolean
Is this task waiting until the queue is initialized?.
-
#md5 ⇒ Object
Calculate md5-Checksum.
-
#new? ⇒ Boolean
Is this task new?.
- #payload ⇒ Object
-
#pending? ⇒ Boolean
Is this task pending?.
-
#prosa ⇒ Object
Placeholder.
-
#reloading_config? ⇒ Boolean
Is the task_queue in state config_reload?.
-
#running? ⇒ Boolean
Is this task running?.
-
#successful? ⇒ Boolean
Was this task finsihed successful?.
-
#update_message(args = {}) ⇒ Object
Update the attributes perc_finished and message according to the args.
-
#update_state_and_exec_callbacks(result, remove_from_queue = false, logger = nil) ⇒ Object
This method updates the task state according to the return code of their corresponding command and removes it from the task_queue.
Instance Method Details
#aborted? ⇒ Boolean
Was this task aborted?
185 186 187 |
# File 'lib/queue_dispatcher/acts_as_task.rb', line 185 def aborted? state == 'aborted' end |
#acquire_lock? ⇒ Boolean
Is this task pending?
161 162 163 |
# File 'lib/queue_dispatcher/acts_as_task.rb', line 161 def acquire_lock? acts_as_task_task_queue && acts_as_task_task_queue.running? && state == 'acquire_lock' end |
#acts_as_task_task_queue ⇒ Object
70 71 72 |
# File 'lib/queue_dispatcher/acts_as_task.rb', line 70 def acts_as_task_task_queue self.send(self.class.acts_as_task_config.task_queue_class_name) end |
#args ⇒ Object
Add task_id to the args
85 86 87 88 89 |
# File 'lib/queue_dispatcher/acts_as_task.rb', line 85 def args a = super a[-1] = a.last.merge(task_id: self.id) if a && a.instance_of?(Array) && a.last.instance_of?(Hash) a end |
#dependent_tasks_executed? ⇒ Boolean
Are all dependent_tasks executed?
203 204 205 206 207 |
# File 'lib/queue_dispatcher/acts_as_task.rb', line 203 def dependent_tasks_executed? state = true dependent_tasks.each{ |dt| state = false unless dt.executed? } state end |
#dependent_tasks_had_errors ⇒ Object
Check recursive, if one or more of the tasks, which this task is dependent on had errors
211 212 213 214 215 216 217 |
# File 'lib/queue_dispatcher/acts_as_task.rb', line 211 def dependent_tasks_had_errors error = false dependent_tasks.each do |t| error = true if t.state == 'error' || t.dependent_tasks_had_errors end error end |
#error? ⇒ Boolean
Had this task error(s)?
179 180 181 |
# File 'lib/queue_dispatcher/acts_as_task.rb', line 179 def error? state == 'error' || state.blank? end |
#execute! ⇒ Object
Execute task
234 235 236 237 |
# File 'lib/queue_dispatcher/acts_as_task.rb', line 234 def execute! payload.task_id = id if payload.methods.include?(:task_id=) payload.send(method_name, *args) end |
#executed? ⇒ Boolean
Was this task already executed?
197 198 199 |
# File 'lib/queue_dispatcher/acts_as_task.rb', line 197 def executed? successful? || error? || aborted? end |
#init_queue? ⇒ Boolean
Is this task waiting until the queue is initialized?
191 192 193 |
# File 'lib/queue_dispatcher/acts_as_task.rb', line 191 def init_queue? state == 'init_queue' end |
#md5 ⇒ Object
Calculate md5-Checksum
226 227 228 229 230 |
# File 'lib/queue_dispatcher/acts_as_task.rb', line 226 def md5 attr_str = '' Task.attribute_names.each{ |a| attr_str += self.send(a).to_s } Digest('MD5').digest(attr_str) end |
#new? ⇒ Boolean
Is this task new?
143 144 145 |
# File 'lib/queue_dispatcher/acts_as_task.rb', line 143 def new? state == 'new' || state == 'new_popped' end |
#payload ⇒ Object
75 76 77 78 79 80 81 |
# File 'lib/queue_dispatcher/acts_as_task.rb', line 75 def payload if target.is_a?(QueueDispatcher::TargetContainer) target.payload else target end end |
#pending? ⇒ Boolean
Is this task pending?
149 150 151 |
# File 'lib/queue_dispatcher/acts_as_task.rb', line 149 def pending? acts_as_task_task_queue && state == 'new' end |
#prosa ⇒ Object
Placeholder. Please override it in your model.
221 222 |
# File 'lib/queue_dispatcher/acts_as_task.rb', line 221 def prosa end |
#reloading_config? ⇒ Boolean
Is the task_queue in state config_reload?
155 156 157 |
# File 'lib/queue_dispatcher/acts_as_task.rb', line 155 def reloading_config? acts_as_task_task_queue && acts_as_task_task_queue.reloading_config? && acts_as_task_task_queue.tasks.where(state: 'new').first.id == id end |
#running? ⇒ Boolean
Is this task running?
167 168 169 |
# File 'lib/queue_dispatcher/acts_as_task.rb', line 167 def running? acts_as_task_task_queue && acts_as_task_task_queue.running? && state == 'running' end |
#successful? ⇒ Boolean
Was this task finsihed successful?
173 174 175 |
# File 'lib/queue_dispatcher/acts_as_task.rb', line 173 def successful? state == 'successful' || state == 'finished' end |
#update_message(args = {}) ⇒ Object
Update the attributes perc_finished and message according to the args
133 134 135 136 137 138 139 |
# File 'lib/queue_dispatcher/acts_as_task.rb', line 133 def (args = {}) msg = args[:msg] perc_finished = args[:perc_finished] self.update_attribute :message, msg if msg self.update_attribute :perc_finished, perc_finished if perc_finished end |
#update_state_and_exec_callbacks(result, remove_from_queue = false, logger = nil) ⇒ Object
This method updates the task state according to the return code of their corresponding command and removes it from the task_queue
93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 |
# File 'lib/queue_dispatcher/acts_as_task.rb', line 93 def update_state_and_exec_callbacks(result, remove_from_queue = false, logger = nil) rc = output = error_msg = nil if result.methods.map(&:to_sym).include?(:rc) && result.methods.map(&:to_sym).include?(:output) && result.methods.map(&:to_sym).include?(:error_msg) rc = result.rc output = result.output error_msg = result.error_msg result = nil elsif result.kind_of?(Hash) rc = result[:rc] output = result[:output] error_msg = result[:error_msg] result = nil end output ||= '' successful = result.methods.map(&:to_sym).include?(:successful?) ? result.successful? : rc.nil? || rc == 0 if successful self.update_attributes :state => 'successful', :perc_finished => 100, :message => output.truncate(10256), :result => result success_callbacks(logger) else self.update_attributes :state => 'error', :error_msg => error_msg, :message => output.truncate(10256), :result => result error_callbacks(logger) end self.update_attributes :task_queue_id => nil if remove_from_queue rc end |