Class: Utilrb::ThreadPool::Task

Inherits:
Object
  • Object
show all
Defined in:
lib/utilrb/thread_pool.rb

Overview

A Task is executed by the thread pool as soon as a free thread is available.

Author:

Constant Summary collapse

Asked =
Class.new(Exception)

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(options = Hash.new, *args, &block) ⇒ Task

A new task which can be added to the work queue of a Utilrb::ThreadPool. If a sync key is given no task having the same key will be executed in parallel which is useful for instance member calls which are not thread safe.

Parameters:

  • options (Hash) (defaults to: Hash.new)

    The options of the task.

  • args (Array)

    The arguments for the code block

  • block (#call)

    The code block

Options Hash (options):

  • :sync_key (Object)

    The sync key

  • :callback (Proc)

    The callback

  • :default (Object)

    Default value returned when an error ocurred which was handled.



127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
# File 'lib/utilrb/thread_pool.rb', line 127

def initialize (options = Hash.new,*args, &block)
    unless block
        raise ArgumentError, 'you must pass a work block to initialize a new Task.'
    end
    options = Kernel.validate_options(options,{:sync_key => nil,:default => nil,:callback => nil})
    @sync_key = options[:sync_key]
    @arguments = args
    @default = options[:default]
    @callback = options[:callback]
    @block = block
    @mutex = Mutex.new
    @pool = nil
    @state_temp = nil
    @state = nil
    reset
end

Instance Attribute Details

#descriptionObject

Custom description which can be used to store a human readable object



76
77
78
# File 'lib/utilrb/thread_pool.rb', line 76

def description
  @description
end

#exceptionException (readonly)

The exception thrown by the custom code block

Returns:



49
50
51
# File 'lib/utilrb/thread_pool.rb', line 49

def exception
  @exception
end

#poolThreadPool (readonly)

Thread pool the task belongs to

Returns:



39
40
41
# File 'lib/utilrb/thread_pool.rb', line 39

def pool
  @pool
end

#queued_atObject

The time the task was queued

return [Time] the time



59
60
61
# File 'lib/utilrb/thread_pool.rb', line 59

def queued_at
  @queued_at
end

#resultObject (readonly)

Result of the code block call



72
73
74
# File 'lib/utilrb/thread_pool.rb', line 72

def result
  @result
end

#started_atObject (readonly)

The time the task was started

return [Time] the time



64
65
66
# File 'lib/utilrb/thread_pool.rb', line 64

def started_at
  @started_at
end

#state:waiting, ... (readonly)

State of the task

Returns:

  • (:waiting, :running, :stopping, :finished, :terminated, :exception)

    the state



44
45
46
# File 'lib/utilrb/thread_pool.rb', line 44

def state
  @state
end

#stopped_atObject (readonly)

The time the task was stopped or finished

return [Time] the time



69
70
71
# File 'lib/utilrb/thread_pool.rb', line 69

def stopped_at
  @stopped_at
end

#sync_keyObject (readonly)

The sync key is used to speifiy that a given task must not run in paralles with another task having the same sync key. If no key is set there are no such constrains for the taks.

Returns:

  • the sync key



34
35
36
# File 'lib/utilrb/thread_pool.rb', line 34

def sync_key
  @sync_key
end

#threadObject (readonly)

The thread the task was assigned to

return [Thread] the thread



54
55
56
# File 'lib/utilrb/thread_pool.rb', line 54

def thread
  @thread
end

Instance Method Details

#callback {|Object, Exception| ... } ⇒ Object

Called from the worker thread when the work is done

Yields:



230
231
232
233
234
# File 'lib/utilrb/thread_pool.rb', line 230

def callback(&block)
    @mutex.synchronize do
        @callback = block
    end
end

#default?Boolean

returns true if the task has a default return vale

Returns:

  • (Boolean)


163
164
165
166
167
# File 'lib/utilrb/thread_pool.rb', line 163

def default?
     @mutex.synchronize do 
         @default != nil
     end
end

#exception?Boolean

Checks if an exception occurred.

Returns:

  • (Boolean)


114
# File 'lib/utilrb/thread_pool.rb', line 114

def exception?; @state == :exception; end

#executeObject

Executes the task. Should be called from a worker thread after pre_execute was called. After execute returned and the task was deleted from any internal list finalize must be called to propagate the task state.

Raises:

  • (RuntimeError)


187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
# File 'lib/utilrb/thread_pool.rb', line 187

def execute()
    raise RuntimeError, "call pre_execute ThreadPool::Task first. Current state is #{@state} but :running was expected" if @state != :running
    @state_temp = begin
                @result = @block.call(*@arguments)
                :finished
            rescue Exception => e
                @exception = e
                if e.is_a? Asked
                    :terminated
                else
                    :exception
                end
            end
    @stopped_at = Time.now
end

#finalizeObject

propagates the tasks state should be called after execute



205
206
207
208
209
210
211
212
213
214
215
216
# File 'lib/utilrb/thread_pool.rb', line 205

def finalize
    @mutex.synchronize do
        @thread = nil
        @state = @state_temp
        @pool = nil
    end
    begin
        @callback.call @result,@exception if @callback
    rescue Exception => e
        ThreadPool.report_exception("thread_pool: in #{self}, callback #{@callback} failed", e)
    end
end

#finished?Boolean

Checks if the task was stopped or finished. This also includes cases where an exception was raised by the custom code block.

Returns:

  • (Boolean)


98
# File 'lib/utilrb/thread_pool.rb', line 98

def finished?; started? && !running? && !stopping?; end

#pre_execute(pool = nil) ⇒ Object

sets all internal state to running call execute after that.



171
172
173
174
175
176
177
178
179
180
# File 'lib/utilrb/thread_pool.rb', line 171

def pre_execute(pool=nil)
    @mutex.synchronize do 
        #store current thread to be able to terminate
        #the thread
        @pool = pool
        @thread = Thread.current
        @started_at = Time.now
        @state = :running
    end
end

#resetObject

Resets the tasks. This can be used to requeue a task that is already finished



146
147
148
149
150
151
152
153
154
155
156
157
158
159
# File 'lib/utilrb/thread_pool.rb', line 146

def reset
    if finished? || !started?
        @mutex.synchronize do
            @result = @default
            @state = :waiting
            @exception = nil
            @started_at = nil
            @queued_at = nil
            @stopped_at = nil
        end
    else
        raise RuntimeError,"cannot reset a task which is not finished"
    end
end

#running?Boolean

Checks if the task is running

Returns:

  • (Boolean)


86
# File 'lib/utilrb/thread_pool.rb', line 86

def running?; @state == :running; end

#started?Boolean

Checks if the task was started

Returns:

  • (Boolean)


81
# File 'lib/utilrb/thread_pool.rb', line 81

def started?; @state != :waiting; end

#stopping?Boolean

Checks if the task is going to be stopped

Returns:

  • (Boolean)


91
# File 'lib/utilrb/thread_pool.rb', line 91

def stopping?; @state == :stopping; end

#successfull?Boolean

Checks if the task was successfully finished. This means no exceptions, termination or timed out occurred

Returns:

  • (Boolean)


104
# File 'lib/utilrb/thread_pool.rb', line 104

def successfull?; @state == :finished; end

#terminate!(exception = Asked) ⇒ Object

Terminates the task if it is running



219
220
221
222
223
224
225
# File 'lib/utilrb/thread_pool.rb', line 219

def terminate!(exception = Asked)
    @mutex.synchronize do
        return unless running?
        @state = :stopping
        @thread.raise exception
    end
end

#terminated?Boolean

Checks if the task was terminated.

Returns:

  • (Boolean)


109
# File 'lib/utilrb/thread_pool.rb', line 109

def terminated?; @state == :terminated; end

#time_elapsed(time = Time.now) ⇒ Object

Returns the number of seconds the task is or was running at the given point in time

@return

Parameters:

  • time (Time) (defaults to: Time.now)

    The point in time.



241
242
243
244
245
246
247
248
249
250
# File 'lib/utilrb/thread_pool.rb', line 241

def time_elapsed(time = Time.now)
    #no need to synchronize here
    if running?
        (time-@started_at).to_f
    elsif finished?
        (@stopped_at-@started_at).to_f
    else
        0
    end
end