Class: Isono::ThreadPool

Inherits:
Object
  • Object
show all
Includes:
Logger
Defined in:
lib/isono/thread_pool.rb

Defined Under Namespace

Classes: TimeoutError, WorkerTerminateError

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from Logger

included, initialize

Constructor Details

#initialize(worker_num = 1, name = nil, opts = {}) ⇒ ThreadPool

Returns a new instance of ThreadPool.



13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
# File 'lib/isono/thread_pool.rb', line 13

def initialize(worker_num=1, name=nil, opts={})
  @queue = ::Queue.new
  @name = name
  @opts = {:stucked_queue_num=>20}.merge(opts)
  @last_stuck_warn_at = Time.now
  
  @worker_threads = {}
  worker_num.times { |wid|
    t = Thread.new {
      # Log4r::PatternFormatter can refer thread name as %h.
      Thread.current[:name] = "#{name}[#{wid}/#{worker_num}]" if name
      begin
        while op = @queue.pop
          if @queue.size > @opts[:stucked_queue_num] && Time.now - @last_stuck_warn_at > 30.0
            logger.warn("too many stucked jobs: #{@queue.size}")
            @last_stuck_warn_at = Time.now
          end

          op_start_at = Time.now
          op.call
          op_elapsed = Time.now - op_start_at
          on_task_end(op_start_at, op_elapsed)
        end
      rescue WorkerTerminateError
        # someone indicated to terminate this thread
        # exit from the current loop
        break
      rescue ::Exception => e
        logger.error(e)
        # worker thread should never die except from the
        # termination using shutdown() method.
        # any errors thrown by op.call will be caught here and
        # back to @queue.pop.
        retry
      ensure
        tid = Thread.current.__id__
        tname = Thread.current[:name] || Thread.current.to_s
        EM.schedule {
          @worker_threads.delete(tid)
          logger.info("Thread #{tname} is being terminated")
        }
      end
      
    }
    @worker_threads[t.__id__] = t
  }
end

Instance Attribute Details

#nameObject (readonly)

Returns the value of attribute name.



11
12
13
# File 'lib/isono/thread_pool.rb', line 11

def name
  @name
end

#queueObject (readonly)

Returns the value of attribute queue.



11
12
13
# File 'lib/isono/thread_pool.rb', line 11

def queue
  @queue
end

Instance Method Details

#barrier(immediate = true, time_out = nil, &blk) ⇒ Object

Send a block to a worker thread similar with pass(). but this get the caller thread waited until the block proceeded in a worker thread.

Parameters:

  • immediage (Bool)
  • time_out (Float) (defaults to: nil)
  • blk (Proc)


79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
# File 'lib/isono/thread_pool.rb', line 79

def barrier(immediate=true, time_out=nil, &blk)
  if immediate && member_thread?
    return blk.call
  end
  
  q = ::Queue.new
  time_start = ::Time.now
  
  self.pass {
    begin
      q << blk.call
    rescue Exception => e
      q << e
    end
  }

  em_sig = nil
  if time_out
    em_sig = EventMachine.add_timer(time_out) {
      q << TimeoutError.new
    }
  end
  
  res = q.shift
  EventMachine.cancel_timer(em_sig)
  time_elapsed = ::Time.now - time_start
  logger.debug("Elapsed time for #{blk}: #{time_elapsed} secs") if time_elapsed > 0.05
  if res.is_a?(Exception)
    raise res
  end
  res
  
end

#clearObject



113
114
115
# File 'lib/isono/thread_pool.rb', line 113

def clear
  @queue.clear
end

#graceful_shutdown2Object



163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
# File 'lib/isono/thread_pool.rb', line 163

def graceful_shutdown2
  # make new jobs push to dummy queue.
  old_queue = @queue
  @queue = ::Queue.new
  # wait until @queue becomes empty
  if !old_queue.empty?
    logger.info("Waiting for #{old_queue.size} worker jobs in #{self}")
    while !old_queue.empty?
      sleep 1
    end
  end

  @worker_threads.each {|t|
    t.raise WorkerTerminateError
  }
end

#member_thread?(thread = Thread.current) ⇒ Boolean

Returns:

  • (Boolean)


125
126
127
# File 'lib/isono/thread_pool.rb', line 125

def member_thread?(thread=Thread.current)
  @worker_threads.has_key?(thread.__id__)      
end

#pass(immediate = true, &blk) ⇒ Object

Pass a block to a worker thread. The job is queued until the worker thread found.

Parameters:

  • immediage (Bool)
  • blk (Proc)

    A block to be proccessed on a worker thread.



65
66
67
68
69
70
71
# File 'lib/isono/thread_pool.rb', line 65

def pass(immediate=true, &blk)
  if immediate && member_thread?
    return blk.call
  end
  
  @queue << blk
end

#shutdownObject

Immediatly shutdown all the worker threads



118
119
120
121
122
123
# File 'lib/isono/thread_pool.rb', line 118

def shutdown()
  @worker_threads.each {|id, t|
    t.__send__(:raise, WorkerTerminateError)
    Thread.pass
  }
end

#shutdown_graceful(timeout) ⇒ Object



129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
# File 'lib/isono/thread_pool.rb', line 129

def shutdown_graceful(timeout)
  term_sig_q = ::Queue.new
  worker_num = @worker_threads.size
  # enqueue the terminate jobs.
  worker_num.times {
    @queue.push proc {
      term_sig_q.enq(1)
      raise WorkerTerminateError
    }
  }

  em_sig = nil
  if timeout > 0.0
    em_sig = EventMachine.add_timer(timeout) {
      worker_num.times {
        term_sig_q << TimeoutError.new
      }
    }
  end

  timeout_workers = 0
  while worker_num > 0
    if term_sig_q.deq.is_a?(TimeoutError)
      timeout_workers += 1
    end
    worker_num -= 1
  end

  logger.error("#{timeout_workers} of worker threads timed out during the cleanup") if timeout_workers > 0
ensure
  shutdown
  EventMachine.cancel_timer(em_sig)
end