Class: ASIR::ThreadPool

Inherits:
Object
  • Object
show all
Includes:
AdditionalData, Initialization
Defined in:
lib/asir/thread_pool.rb

Defined Under Namespace

Classes: Work, Worker

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from AdditionalData

#[], #[]=, #_additional_data, #additional_data, #additional_data!, #additional_data=, included

Constructor Details

#initialize(*args) ⇒ ThreadPool

Returns a new instance of ThreadPool.



15
16
17
18
19
20
21
22
23
24
25
# File 'lib/asir/thread_pool.rb', line 15

def initialize *args
  super
  @thread_class ||= ::Thread
  @workers_mutex  = Mutex.new
  @work_mutex     = Mutex.new
  @workers      ||= [ ]
  @work_queue   ||= Queue.new
  @run = false
  @work_id = @worker_id = 0
  @time_0 ||= Time.now
end

Instance Attribute Details

#auto_start_workersObject

Returns the value of attribute auto_start_workers.



10
11
12
# File 'lib/asir/thread_pool.rb', line 10

def auto_start_workers
  @auto_start_workers
end

#n_workersObject

Returns the value of attribute n_workers.



9
10
11
# File 'lib/asir/thread_pool.rb', line 9

def n_workers
  @n_workers
end

#runObject

Returns the value of attribute run.



13
14
15
# File 'lib/asir/thread_pool.rb', line 13

def run
  @run
end

#thread_classObject

Returns the value of attribute thread_class.



9
10
11
# File 'lib/asir/thread_pool.rb', line 9

def thread_class
  @thread_class
end

#verboseObject

Returns the value of attribute verbose.



12
13
14
# File 'lib/asir/thread_pool.rb', line 12

def verbose
  @verbose
end

#work_queueObject

Returns the value of attribute work_queue.



11
12
13
# File 'lib/asir/thread_pool.rb', line 11

def work_queue
  @work_queue
end

#workersObject

Returns the value of attribute workers.



9
10
11
# File 'lib/asir/thread_pool.rb', line 9

def workers
  @workers
end

Instance Method Details

#join(*args) ⇒ Object



134
135
136
137
138
139
140
# File 'lib/asir/thread_pool.rb', line 134

def join *args
  until @workers.empty?
    @workers.each do | worker |
      worker && worker.join(*args)
    end
  end
end

#kill!(*args) ⇒ Object



123
124
125
126
127
128
129
130
131
132
# File 'lib/asir/thread_pool.rb', line 123

def kill! *args
  log! "kill!"
  @run = false
  @workers_mutex.synchronize do
    @workers.each do | worker |
      worker.kill! *args
    end
  end
  self
end

#log!(msg = nil) ⇒ Object



101
102
103
104
105
106
107
# File 'lib/asir/thread_pool.rb', line 101

def log! msg = nil
  return unless @verbose
  msg ||= yield
  @time_1 = Time.now
  $stderr.puts "  #{@time_1 - @time_0} #{$$} #{Thread.current.object_id} #{self} #{msg}"
  self
end

#new(&blk) ⇒ Object

Returns a new Work object.



28
29
30
31
32
33
34
35
36
37
38
# File 'lib/asir/thread_pool.rb', line 28

def new &blk
  work_id = @work_mutex.synchronize do
    @work_id += 1
  end
  work = Work.new(:block => blk, :work_id => work_id)
  work_created! work
  @work_queue.enq(work)
  @run = true
  start_workers! if @auto_start_workers
  work
end

#start_workers!Object

Keep a list of workers busy.



41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
# File 'lib/asir/thread_pool.rb', line 41

def start_workers!
  return nil unless @run
  workers_size = @workers.size
  want_n = [ n_workers, @work_queue.size ].min
  want_n = n_workers if want_n > n_workers
  start_n = want_n - workers_size
  start_n = 0 if start_n < 0
  return unless start_n > 0
  log! { "start_workers! #{start_n}" }
  start_n.times do
    thread_class.new do
      worker_id = @workers_mutex.synchronize do
        @worker_id += 1
      end
      worker = Worker.new(:thread_pool => self, :worker_id => worker_id)
      worker_created! worker
      begin
        worker_starting! worker
        @workers_mutex.synchronize do
          @workers << worker
        end
        worker.run!
#          rescue ::Exception => exc
#            log! "ERROR: #{exc.inspect}\n#{exc.backtrace * "\n"}"
#            raise exc
      ensure
        @workers_mutex.synchronize do
          @workers.delete(worker)
        end
        worker_stopping! worker
      end
    end
  end
  self
end

#stop!Object



109
110
111
112
113
114
115
116
117
118
119
120
121
# File 'lib/asir/thread_pool.rb', line 109

def stop!
  log! "stop!"
  @run = false
  # Ask each current worker to :stop!
  @workers_mutex.synchronize do
    @workers.each do | w |
      @work_queue.enq :stop!
    end
  end
  # Just incase.
  @work_queue.enq :stop!
  self
end

#work_created!(work) ⇒ Object



89
90
91
# File 'lib/asir/thread_pool.rb', line 89

def work_created! work
  log! { "work_created! #{work.inspect}" }
end

#work_starting!(work) ⇒ Object



93
94
95
# File 'lib/asir/thread_pool.rb', line 93

def work_starting! work
  log! { "work_starting! #{work.inspect} #{work.worker.inspect}" }
end

#work_stopping!(work) ⇒ Object



97
98
99
# File 'lib/asir/thread_pool.rb', line 97

def work_stopping! work
  log! { "work_stopping! #{work.inspect}" }
end

#worker_created!(worker) ⇒ Object



77
78
79
# File 'lib/asir/thread_pool.rb', line 77

def worker_created! worker
  log! { "worker_created! #{worker.inspect}" }
end

#worker_starting!(worker) ⇒ Object



81
82
83
# File 'lib/asir/thread_pool.rb', line 81

def worker_starting! worker
  log! { "worker_starting! #{worker}" }
end

#worker_stopping!(worker) ⇒ Object



85
86
87
# File 'lib/asir/thread_pool.rb', line 85

def worker_stopping! worker
  log! { "worker_stopping! #{worker}" }
end