Class: Gearman::Task

Inherits:
Object
  • Object
show all
Defined in:
lib/gearman/task.rb

Overview

Task

Description

A task submitted to a Gearman job server.

Direct Known Subclasses

BackgroundTask

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(func, arg = '', opts = {}) ⇒ Task

Create a new Task object.

Parameters:

  • func

    function name

  • arg (defaults to: '')

    argument to the function

  • opts (defaults to: {})

    hash of additional options



19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
# File 'lib/gearman/task.rb', line 19

def initialize(func, arg='', opts={})
  @func = func.to_s
  @arg = arg or ''  # TODO: use something more ref-like?
  @uniq = nil       # Initialize to nil
  %w{on_complete on_fail on_retry on_exception on_status on_warning on_data
     uniq retry_count priority hash background schedule_at}.map {|s| s.to_sym }.each do |k|
    instance_variable_set "@#{k}", opts[k]
    opts.delete k
  end
  if opts.size > 0
    raise InvalidArgsError, 'Invalid task args: ' + opts.keys.sort.join(', ')
  end
  @retry_count ||= 0
  @successful = false
  @retries_done = 0
end

Instance Attribute Details

#argObject (readonly)

Returns the value of attribute arg.



37
38
39
# File 'lib/gearman/task.rb', line 37

def arg
  @arg
end

#backgroundObject

Returns the value of attribute background.



36
37
38
# File 'lib/gearman/task.rb', line 36

def background
  @background
end

#epochObject

Returns the value of attribute epoch.



36
37
38
# File 'lib/gearman/task.rb', line 36

def epoch
  @epoch
end

#funcObject (readonly)

Returns the value of attribute func.



37
38
39
# File 'lib/gearman/task.rb', line 37

def func
  @func
end

#priorityObject

Returns the value of attribute priority.



36
37
38
# File 'lib/gearman/task.rb', line 36

def priority
  @priority
end

#retries_doneObject (readonly)

Returns the value of attribute retries_done.



37
38
39
# File 'lib/gearman/task.rb', line 37

def retries_done
  @retries_done
end

#retry_countObject

Returns the value of attribute retry_count.



36
37
38
# File 'lib/gearman/task.rb', line 36

def retry_count
  @retry_count
end

#successfulObject (readonly)

Returns the value of attribute successful.



37
38
39
# File 'lib/gearman/task.rb', line 37

def successful
  @successful
end

#uniqObject

Returns the value of attribute uniq.



36
37
38
# File 'lib/gearman/task.rb', line 36

def uniq
  @uniq
end

Instance Method Details

#get_submit_packetObject

Construct a packet to submit this task to a job server.

Returns:

  • String representation of packet



198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
# File 'lib/gearman/task.rb', line 198

def get_submit_packet()
  modes = ['submit_job']

  if @schedule_at
    modes << 'epoch'
    args = [func, get_uniq_hash, @schedule_at.to_i, arg]
  else
    if @priority
      modes << 'high' if @priority == :high
      modes << 'low'  if @priority == :low
    end

    modes << 'bg' if @background

    args = [func, get_uniq_hash, arg]
  end

  mode = modes.join('_')
  Packet::pack_request(mode, args.join("\0"))
end

#get_uniq_hashObject

Return a hash that we can use to execute identical tasks on the same job server.

Returns:

  • hashed value, based on @func + @arg (sorted letters and then SHA1’d) if @uniq is nil, or the SHA1 of @uniq if it’s set to something



182
183
184
185
186
187
188
189
190
191
192
# File 'lib/gearman/task.rb', line 182

def get_uniq_hash
  return @hash if @hash

  if @uniq.nil?
    string = @func+@arg.to_s
  else
    string = @uniq
  end

  @hash = Digest::SHA1.hexdigest(string)
end

#handle_completion(data) ⇒ Object

Handle completion of the task.

Parameters:

  • data

    data returned from the server (doesn’t include handle)



116
117
118
119
120
# File 'lib/gearman/task.rb', line 116

def handle_completion(data)
  @successful = true
  @on_complete.call(data) if @on_complete
  self
end

#handle_created(data) ⇒ Object



126
127
128
129
# File 'lib/gearman/task.rb', line 126

def handle_created(data)
  @on_created.call(data) if @on_created
  self
end

#handle_data(data) ⇒ Object

Handle (partial) data



170
171
172
173
# File 'lib/gearman/task.rb', line 170

def handle_data(data)
  @on_data.call(data) if @on_data
  self
end

#handle_exception(exception) ⇒ Object

Record an exception.



148
149
150
151
# File 'lib/gearman/task.rb', line 148

def handle_exception(exception)
  @on_exception.call(exception) if @on_exception
  self
end

#handle_failureObject

Record a failure and check whether we should be retried.

Returns:

  • true if we should be resubmitted; false otherwise



135
136
137
138
139
140
141
142
143
# File 'lib/gearman/task.rb', line 135

def handle_failure
  if @retries_done >= @retry_count
    @on_fail.call if @on_fail
    return false
  end
  @retries_done += 1
  @on_retry.call(@retries_done) if @on_retry
  true
end

#handle_status(numerator, denominator) ⇒ Object

Handle a status update for the task.



155
156
157
158
# File 'lib/gearman/task.rb', line 155

def handle_status(numerator, denominator)
  @on_status.call(numerator, denominator) if @on_status
  self
end

#handle_warning(message) ⇒ Object

Handle a warning.



163
164
165
166
# File 'lib/gearman/task.rb', line 163

def handle_warning(message)
  @on_warning.call(message) if @on_warning
  self
end

#on_complete(&f) ⇒ Object

Set a block of code to be executed when this task completes successfully. The returned data will be passed to the block.



59
60
61
# File 'lib/gearman/task.rb', line 59

def on_complete(&f)
  @on_complete = f
end

#on_created(&f) ⇒ Object



122
123
124
# File 'lib/gearman/task.rb', line 122

def on_created(&f)
  @on_created = f
end

#on_data(&f) ⇒ Object

Set a block of code to be executed when we receive a (partial) data packet for this task. The data received will be passed as an argument to the block.



108
109
110
# File 'lib/gearman/task.rb', line 108

def on_data(&f)
  @on_data = f
end

#on_exception(&f) ⇒ Object

Set a block of code to be executed when a remote exception is sent by a worker. The block will receive the message of the exception passed from the worker. The user can return true for retrying or false to mark it as finished

NOTE: this is actually deprecated, cf. bugs.launchpad.net/gearmand/+bug/405732



84
85
86
# File 'lib/gearman/task.rb', line 84

def on_exception(&f)
  @on_exception = f
end

#on_fail(&f) ⇒ Object

Set a block of code to be executed when this task fails.



65
66
67
# File 'lib/gearman/task.rb', line 65

def on_fail(&f)
  @on_fail = f
end

#on_retry(&f) ⇒ Object

Set a block of code to be executed when this task is retried after failing. The number of retries that have been attempted (including the current one) will be passed to the block.



73
74
75
# File 'lib/gearman/task.rb', line 73

def on_retry(&f)
  @on_retry = f
end

#on_status(&f) ⇒ Object

Set a block of code to be executed when we receive a status update for this task. The block will receive two arguments, a numerator and denominator describing the task’s status.



92
93
94
# File 'lib/gearman/task.rb', line 92

def on_status(&f)
  @on_status = f
end

#on_warning(&f) ⇒ Object

Set a block of code to be executed when we receive a warning from a worker. It is recommended for workers to send work_warning, followed by work_fail if an exception occurs on their side. Don’t expect this behavior from workers NOT using this very library ATM, though. (cf. bugs.launchpad.net/gearmand/+bug/405732)



101
102
103
# File 'lib/gearman/task.rb', line 101

def on_warning(&f)
  @on_warning = f
end

#reset_stateObject

Internal method to reset this task’s state so it can be run again. Called by TaskSet#add_task.



50
51
52
53
54
# File 'lib/gearman/task.rb', line 50

def reset_state
  @retries_done = 0
  @successful = false
  self
end

#schedule(time) ⇒ Object

Schedule this job to run at a certain time

Parameters:

  • time

    Ruby Time object that represents when to run the thing



43
44
45
# File 'lib/gearman/task.rb', line 43

def schedule(time)
  @schedule_at = time
end