Class: Tpool

Inherits:
Object
  • Object
show all
Defined in:
lib/tpool.rb

Defined Under Namespace

Classes: Block

Constant Summary collapse

ARGS_VALID =
[:threads, :priority, :on_error]

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(args) ⇒ Tpool

Returns a new instance of Tpool.



20
21
22
23
24
25
26
27
28
# File 'lib/tpool.rb', line 20

def initialize(args)
  @args = args
  @args.each do |key, val|
    raise "Invalid argument given: '#{key}'." if Tpool::ARGS_VALID.index(key) == nil
  end
  
  @queue = Queue.new
  self.start
end

Instance Attribute Details

#argsObject (readonly)

Returns the value of attribute args.



4
5
6
# File 'lib/tpool.rb', line 4

def args
  @args
end

Class Method Details

.const_missing(name) ⇒ Object



8
9
10
11
12
# File 'lib/tpool.rb', line 8

def self.const_missing(name)
  require "#{File.dirname(__FILE__)}/tpool_#{name.to_s.downcase}.rb"
  raise "Still not defined: '#{name}'." if !Tpool.const_defined?(name)
  return Tpool.const_get(name)
end

.current_block(thread = Thread.current) ⇒ Object

Returns the ‘Tpool::Block’ that is running in the given thread.



15
16
17
18
# File 'lib/tpool.rb', line 15

def self.current_block(thread = Thread.current)
  return thread[:tpool][:block] if thread[:tpool].is_a?(Hash)
  raise "No block was found running in that thread."
end

Instance Method Details

#on_error(&blk) ⇒ Object



103
104
105
# File 'lib/tpool.rb', line 103

def on_error(&blk)
  @on_error = blk
end

#on_error_call(*args) ⇒ Object



99
100
101
# File 'lib/tpool.rb', line 99

def on_error_call(*args)
  @on_error.call(*args) if @on_error
end

#run(*args, &blk) ⇒ Object

Runs the given block in the thread-pool, joins it and returns the result.



63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
# File 'lib/tpool.rb', line 63

def run(*args, &blk)
  raise "No block was given." if !blk
  
  block = Tpool::Block.new(
    :args => args,
    :blk => blk,
    :tpool => self,
    :thread_starts => [Thread.current]
  )
  @queue << block
  
  begin
    Thread.stop
  rescue Exception
    #Its not possible to stop main thread (dead-lock-error - sleep it instead).
    sleep 0.1 while !block.done?
  end
  
  return block.res
end

#run_async(*args, &blk) ⇒ Object

Runs the given block in the thread-pool and returns a block-object to keep a look on the status.



85
86
87
88
89
90
91
92
93
94
95
96
97
# File 'lib/tpool.rb', line 85

def run_async(*args, &blk)
  raise "No block was given." if !blk
  
  block = Tpool::Block.new(
    :tpool => self,
    :args => args,
    :blk => blk,
    :thread_starts => [Thread.current]
  )
  @queue << block
  
  return block
end

#startObject

Starts the thread-pool. This is automatically called on initialization.



31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
# File 'lib/tpool.rb', line 31

def start
  raise "Already started." if @pool and !@pool.empty?
  
  @pool = Array.new(@args[:threads]) do |i|
    Thread.new do
      begin
        Thread.current[:tpool] = {:i => i}
        
        loop do
          block = @queue.pop
          Thread.current[:tpool][:block] = block
          
          block.run
          Thread.current[:tpool].delete(:block)
        end
      rescue Exception => e
        $stderr.puts e.inspect
        $stderr.puts e.backtrace
      end
    end
  end
end

#stopObject

Kills all running threads.



55
56
57
58
59
60
# File 'lib/tpool.rb', line 55

def stop
  @pool.delete_if do |thread|
    thread.kill
    true
  end
end