Class: Utilrb::ThreadPool::Task

Inherits:
Object
  • Object
show all
Defined in:
lib/utilrb/thread_pool.rb

Overview

A Task is executed by the thread pool as soon as a free thread is available.

Author:

Constant Summary collapse

Asked =
Class.new(Exception)

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(options = Hash.new, *args, &block) ⇒ Task

A new task which can be added to the work queue of a Utilrb::ThreadPool. If a sync key is given no task having the same key will be executed in parallel which is useful for instance member calls which are not thread safe.

Options Hash (options):

  • :sync_key (Object)

    The sync key

  • :callback (Proc)

    The callback

  • :default (Object)

    Default value returned when an error ocurred which was handled.


128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
# File 'lib/utilrb/thread_pool.rb', line 128

def initialize (options = Hash.new,*args, &block)
    unless block
        raise ArgumentError, 'you must pass a work block to initialize a new Task.'
    end
    options = Kernel.validate_options(options,{:sync_key => nil,:default => nil,:callback => nil})
    @sync_key = options[:sync_key]
    @arguments = args
    @default = options[:default]
    @callback = options[:callback]
    @block = block
    @mutex = Mutex.new
    @pool = nil
    @state_temp = nil
    @state = nil
    reset
end

Instance Attribute Details

#descriptionObject

Custom description which can be used to store a human readable object


77
78
79
# File 'lib/utilrb/thread_pool.rb', line 77

def description
  @description
end

#exceptionException (readonly)

The exception thrown by the custom code block


50
51
52
# File 'lib/utilrb/thread_pool.rb', line 50

def exception
  @exception
end

#poolThreadPool (readonly)

Thread pool the task belongs to


40
41
42
# File 'lib/utilrb/thread_pool.rb', line 40

def pool
  @pool
end

#queued_atObject

The time the task was queued

return [Time] the time


60
61
62
# File 'lib/utilrb/thread_pool.rb', line 60

def queued_at
  @queued_at
end

#resultObject (readonly)

Result of the code block call


73
74
75
# File 'lib/utilrb/thread_pool.rb', line 73

def result
  @result
end

#started_atObject (readonly)

The time the task was started

return [Time] the time


65
66
67
# File 'lib/utilrb/thread_pool.rb', line 65

def started_at
  @started_at
end

#state:waiting, ... (readonly)

State of the task


45
46
47
# File 'lib/utilrb/thread_pool.rb', line 45

def state
  @state
end

#stopped_atObject (readonly)

The time the task was stopped or finished

return [Time] the time


70
71
72
# File 'lib/utilrb/thread_pool.rb', line 70

def stopped_at
  @stopped_at
end

#sync_keyObject (readonly)

The sync key is used to speifiy that a given task must not run in paralles with another task having the same sync key. If no key is set there are no such constrains for the taks.


35
36
37
# File 'lib/utilrb/thread_pool.rb', line 35

def sync_key
  @sync_key
end

#threadObject (readonly)

The thread the task was assigned to

return [Thread] the thread


55
56
57
# File 'lib/utilrb/thread_pool.rb', line 55

def thread
  @thread
end

Instance Method Details

#callback {|Object, Exception| ... } ⇒ Object

Called from the worker thread when the work is done

Yields:


231
232
233
234
235
# File 'lib/utilrb/thread_pool.rb', line 231

def callback(&block)
    @mutex.synchronize do
        @callback = block
    end
end

#default?Boolean

returns true if the task has a default return vale


164
165
166
167
168
# File 'lib/utilrb/thread_pool.rb', line 164

def default?
     @mutex.synchronize do 
         @default != nil
     end
end

#exception?Boolean

Checks if an exception occurred.


115
# File 'lib/utilrb/thread_pool.rb', line 115

def exception?; @state == :exception; end

#executeObject

Executes the task. Should be called from a worker thread after pre_execute was called. After execute returned and the task was deleted from any internal list finalize must be called to propagate the task state.

Raises:

  • (RuntimeError)

188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
# File 'lib/utilrb/thread_pool.rb', line 188

def execute()
    raise RuntimeError, "call pre_execute ThreadPool::Task first. Current state is #{@state} but :running was expected" if @state != :running
    @state_temp = begin
                @result = @block.call(*@arguments)
                :finished
            rescue Exception => e
                @exception = e
                if e.is_a? Asked
                    :terminated
                else
                    :exception
                end
            end
    @stopped_at = Time.now
end

#finalizeObject

propagates the tasks state should be called after execute


206
207
208
209
210
211
212
213
214
215
216
217
# File 'lib/utilrb/thread_pool.rb', line 206

def finalize
    @mutex.synchronize do
        @thread = nil
        @state = @state_temp
        @pool = nil
    end
    begin
        @callback.call @result,@exception if @callback
    rescue Exception => e
        ThreadPool.report_exception("thread_pool: in #{self}, callback #{@callback} failed", e)
    end
end

#finished?Boolean

Checks if the task was stopped or finished. This also includes cases where an exception was raised by the custom code block.


99
# File 'lib/utilrb/thread_pool.rb', line 99

def finished?; started? && !running? && !stopping?; end

#pre_execute(pool = nil) ⇒ Object

sets all internal state to running call execute after that.


172
173
174
175
176
177
178
179
180
181
# File 'lib/utilrb/thread_pool.rb', line 172

def pre_execute(pool=nil)
    @mutex.synchronize do 
        #store current thread to be able to terminate
        #the thread
        @pool = pool
        @thread = Thread.current
        @started_at = Time.now
        @state = :running
    end
end

#resetObject

Resets the tasks. This can be used to requeue a task that is already finished


147
148
149
150
151
152
153
154
155
156
157
158
159
160
# File 'lib/utilrb/thread_pool.rb', line 147

def reset
    if finished? || !started?
        @mutex.synchronize do
            @result = @default
            @state = :waiting
            @exception = nil
            @started_at = nil
            @queued_at = nil
            @stopped_at = nil
        end
    else
        raise RuntimeError,"cannot reset a task which is not finished"
    end
end

#running?Boolean

Checks if the task is running


87
# File 'lib/utilrb/thread_pool.rb', line 87

def running?; @state == :running; end

#started?Boolean

Checks if the task was started


82
# File 'lib/utilrb/thread_pool.rb', line 82

def started?; @state != :waiting; end

#stopping?Boolean

Checks if the task is going to be stopped


92
# File 'lib/utilrb/thread_pool.rb', line 92

def stopping?; @state == :stopping; end

#successfull?Boolean

Checks if the task was successfully finished. This means no exceptions, termination or timed out occurred


105
# File 'lib/utilrb/thread_pool.rb', line 105

def successfull?; @state == :finished; end

#terminate!(exception = Asked) ⇒ Object

Terminates the task if it is running


220
221
222
223
224
225
226
# File 'lib/utilrb/thread_pool.rb', line 220

def terminate!(exception = Asked)
    @mutex.synchronize do
        return unless running?
        @state = :stopping
        @thread.raise exception
    end
end

#terminated?Boolean

Checks if the task was terminated.


110
# File 'lib/utilrb/thread_pool.rb', line 110

def terminated?; @state == :terminated; end

#time_elapsed(time = Time.now) ⇒ Object

Returns the number of seconds the task is or was running at the given point in time

@return


242
243
244
245
246
247
248
249
250
251
# File 'lib/utilrb/thread_pool.rb', line 242

def time_elapsed(time = Time.now)
    #no need to synchronize here
    if @stopped_at
        (@stopped_at-@started_at).to_f
    elsif @started_at
        (time-@started_at).to_f
    else
        0
    end
end