Class: Gearman::Task
- Inherits:
-
Object
- Object
- Gearman::Task
- Defined in:
- lib/gearman/task.rb
Overview
Task
Description
A task submitted to a Gearman job server.
Direct Known Subclasses
Instance Attribute Summary collapse
-
#arg ⇒ Object
readonly
Returns the value of attribute arg.
-
#background ⇒ Object
Returns the value of attribute background.
-
#epoch ⇒ Object
Returns the value of attribute epoch.
-
#func ⇒ Object
readonly
Returns the value of attribute func.
-
#priority ⇒ Object
Returns the value of attribute priority.
-
#retries_done ⇒ Object
readonly
Returns the value of attribute retries_done.
-
#retry_count ⇒ Object
Returns the value of attribute retry_count.
-
#successful ⇒ Object
readonly
Returns the value of attribute successful.
-
#uniq ⇒ Object
Returns the value of attribute uniq.
Instance Method Summary collapse
-
#get_submit_packet ⇒ Object
Construct a packet to submit this task to a job server.
-
#get_uniq_hash ⇒ Object
Return a hash that we can use to execute identical tasks on the same job server.
-
#handle_completion(data) ⇒ Object
Handle completion of the task.
- #handle_created(data) ⇒ Object
-
#handle_data(data) ⇒ Object
Handle (partial) data.
-
#handle_exception(exception) ⇒ Object
Record an exception.
-
#handle_failure ⇒ Object
Record a failure and check whether we should be retried.
-
#handle_status(numerator, denominator) ⇒ Object
Handle a status update for the task.
-
#handle_warning(message) ⇒ Object
Handle a warning.
-
#initialize(func, arg = '', opts = {}) ⇒ Task
constructor
Create a new Task object.
-
#on_complete(&f) ⇒ Object
Set a block of code to be executed when this task completes successfully.
- #on_created(&f) ⇒ Object
-
#on_data(&f) ⇒ Object
Set a block of code to be executed when we receive a (partial) data packet for this task.
-
#on_exception(&f) ⇒ Object
Set a block of code to be executed when a remote exception is sent by a worker.
-
#on_fail(&f) ⇒ Object
Set a block of code to be executed when this task fails.
-
#on_retry(&f) ⇒ Object
Set a block of code to be executed when this task is retried after failing.
-
#on_status(&f) ⇒ Object
Set a block of code to be executed when we receive a status update for this task.
-
#on_warning(&f) ⇒ Object
Set a block of code to be executed when we receive a warning from a worker.
-
#reset_state ⇒ Object
Internal method to reset this task’s state so it can be run again.
-
#schedule(time) ⇒ Object
Schedule this job to run at a certain time.
Constructor Details
#initialize(func, arg = '', opts = {}) ⇒ Task
Create a new Task object.
19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 |
# File 'lib/gearman/task.rb', line 19 def initialize(func, arg='', opts={}) @func = func.to_s @arg = arg or '' # TODO: use something more ref-like? @uniq = nil # Initialize to nil %w{on_complete on_fail on_retry on_exception on_status on_warning on_data uniq retry_count priority hash background schedule_at}.map {|s| s.to_sym }.each do |k| instance_variable_set "@#{k}", opts[k] opts.delete k end if opts.size > 0 raise InvalidArgsError, 'Invalid task args: ' + opts.keys.sort.join(', ') end @retry_count ||= 0 @successful = false @retries_done = 0 end |
Instance Attribute Details
#arg ⇒ Object (readonly)
Returns the value of attribute arg.
37 38 39 |
# File 'lib/gearman/task.rb', line 37 def arg @arg end |
#background ⇒ Object
Returns the value of attribute background.
36 37 38 |
# File 'lib/gearman/task.rb', line 36 def background @background end |
#epoch ⇒ Object
Returns the value of attribute epoch.
36 37 38 |
# File 'lib/gearman/task.rb', line 36 def epoch @epoch end |
#func ⇒ Object (readonly)
Returns the value of attribute func.
37 38 39 |
# File 'lib/gearman/task.rb', line 37 def func @func end |
#priority ⇒ Object
Returns the value of attribute priority.
36 37 38 |
# File 'lib/gearman/task.rb', line 36 def priority @priority end |
#retries_done ⇒ Object (readonly)
Returns the value of attribute retries_done.
37 38 39 |
# File 'lib/gearman/task.rb', line 37 def retries_done @retries_done end |
#retry_count ⇒ Object
Returns the value of attribute retry_count.
36 37 38 |
# File 'lib/gearman/task.rb', line 36 def retry_count @retry_count end |
#successful ⇒ Object (readonly)
Returns the value of attribute successful.
37 38 39 |
# File 'lib/gearman/task.rb', line 37 def successful @successful end |
#uniq ⇒ Object
Returns the value of attribute uniq.
36 37 38 |
# File 'lib/gearman/task.rb', line 36 def uniq @uniq end |
Instance Method Details
#get_submit_packet ⇒ Object
Construct a packet to submit this task to a job server.
198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 |
# File 'lib/gearman/task.rb', line 198 def get_submit_packet() modes = ['submit_job'] if @schedule_at modes << 'epoch' args = [func, get_uniq_hash, @schedule_at.to_i, arg] else if @priority modes << 'high' if @priority == :high modes << 'low' if @priority == :low end modes << 'bg' if @background args = [func, get_uniq_hash, arg] end mode = modes.join('_') Packet::pack_request(mode, args.join("\0")) end |
#get_uniq_hash ⇒ Object
Return a hash that we can use to execute identical tasks on the same job server.
182 183 184 185 186 187 188 189 190 191 192 |
# File 'lib/gearman/task.rb', line 182 def get_uniq_hash return @hash if @hash if @uniq.nil? string = @func+@arg.to_s else string = @uniq end @hash = Digest::SHA1.hexdigest(string) end |
#handle_completion(data) ⇒ Object
Handle completion of the task.
116 117 118 119 120 |
# File 'lib/gearman/task.rb', line 116 def handle_completion(data) @successful = true @on_complete.call(data) if @on_complete self end |
#handle_created(data) ⇒ Object
126 127 128 129 |
# File 'lib/gearman/task.rb', line 126 def handle_created(data) @on_created.call(data) if @on_created self end |
#handle_data(data) ⇒ Object
Handle (partial) data
170 171 172 173 |
# File 'lib/gearman/task.rb', line 170 def handle_data(data) @on_data.call(data) if @on_data self end |
#handle_exception(exception) ⇒ Object
Record an exception.
148 149 150 151 |
# File 'lib/gearman/task.rb', line 148 def handle_exception(exception) @on_exception.call(exception) if @on_exception self end |
#handle_failure ⇒ Object
Record a failure and check whether we should be retried.
135 136 137 138 139 140 141 142 143 |
# File 'lib/gearman/task.rb', line 135 def handle_failure if @retries_done >= @retry_count @on_fail.call if @on_fail return false end @retries_done += 1 @on_retry.call(@retries_done) if @on_retry true end |
#handle_status(numerator, denominator) ⇒ Object
Handle a status update for the task.
155 156 157 158 |
# File 'lib/gearman/task.rb', line 155 def handle_status(numerator, denominator) @on_status.call(numerator, denominator) if @on_status self end |
#handle_warning(message) ⇒ Object
Handle a warning.
163 164 165 166 |
# File 'lib/gearman/task.rb', line 163 def handle_warning() @on_warning.call() if @on_warning self end |
#on_complete(&f) ⇒ Object
Set a block of code to be executed when this task completes successfully. The returned data will be passed to the block.
59 60 61 |
# File 'lib/gearman/task.rb', line 59 def on_complete(&f) @on_complete = f end |
#on_created(&f) ⇒ Object
122 123 124 |
# File 'lib/gearman/task.rb', line 122 def on_created(&f) @on_created = f end |
#on_data(&f) ⇒ Object
Set a block of code to be executed when we receive a (partial) data packet for this task. The data received will be passed as an argument to the block.
108 109 110 |
# File 'lib/gearman/task.rb', line 108 def on_data(&f) @on_data = f end |
#on_exception(&f) ⇒ Object
Set a block of code to be executed when a remote exception is sent by a worker. The block will receive the message of the exception passed from the worker. The user can return true for retrying or false to mark it as finished
NOTE: this is actually deprecated, cf. bugs.launchpad.net/gearmand/+bug/405732
84 85 86 |
# File 'lib/gearman/task.rb', line 84 def on_exception(&f) @on_exception = f end |
#on_fail(&f) ⇒ Object
Set a block of code to be executed when this task fails.
65 66 67 |
# File 'lib/gearman/task.rb', line 65 def on_fail(&f) @on_fail = f end |
#on_retry(&f) ⇒ Object
Set a block of code to be executed when this task is retried after failing. The number of retries that have been attempted (including the current one) will be passed to the block.
73 74 75 |
# File 'lib/gearman/task.rb', line 73 def on_retry(&f) @on_retry = f end |
#on_status(&f) ⇒ Object
Set a block of code to be executed when we receive a status update for this task. The block will receive two arguments, a numerator and denominator describing the task’s status.
92 93 94 |
# File 'lib/gearman/task.rb', line 92 def on_status(&f) @on_status = f end |
#on_warning(&f) ⇒ Object
Set a block of code to be executed when we receive a warning from a worker. It is recommended for workers to send work_warning, followed by work_fail if an exception occurs on their side. Don’t expect this behavior from workers NOT using this very library ATM, though. (cf. bugs.launchpad.net/gearmand/+bug/405732)
101 102 103 |
# File 'lib/gearman/task.rb', line 101 def on_warning(&f) @on_warning = f end |
#reset_state ⇒ Object
Internal method to reset this task’s state so it can be run again. Called by TaskSet#add_task.
50 51 52 53 54 |
# File 'lib/gearman/task.rb', line 50 def reset_state @retries_done = 0 @successful = false self end |
#schedule(time) ⇒ Object
Schedule this job to run at a certain time
43 44 45 |
# File 'lib/gearman/task.rb', line 43 def schedule(time) @schedule_at = time end |