Class: Isono::ThreadPool

Inherits:
Object
  • Object
show all
Includes:
Logger
Defined in:
lib/isono/thread_pool.rb

Defined Under Namespace

Classes: TimeoutError, WorkerTerminateError

Instance Method Summary collapse

Methods included from Logger

included, initialize

Constructor Details

#initialize(worker_num = 1, name = nil, opts = {}) ⇒ ThreadPool

Returns a new instance of ThreadPool.



11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
# File 'lib/isono/thread_pool.rb', line 11

def initialize(worker_num=1, name=nil, opts={})
  set_instance_logger(name)
  @queue = ::Queue.new
  @name = name
  @opts = {:stucked_queue_num=>20}.merge(opts)
  @last_stuck_warn_at = Time.now
  
  @worker_threads = {}
  worker_num.times {
    t = Thread.new {
      begin
        while op = @queue.pop
          if @queue.size > @opts[:stucked_queue_num] && Time.now - @last_stuck_warn_at > 5.0
            logger.warn("too many stacked jobs: #{@queue.size}")
            @last_stuck_warn_at = Time.now
          end
          
          op.call
        end
      rescue WorkerTerminateError
        # someone indicated to terminate this thread
        # exit from the current loop
        break
      rescue Exception => e
        logger.error(e)
        retry
      ensure
        EM.schedule {
          @worker_threads.delete(Thread.current.__id__)
          logger.debug("#{Thread.current} is being terminated")
        }
      end
      
    }
    @worker_threads[t.__id__] = t
  }
end

Instance Method Details

#barrier(immediate = true, time_out = nil, &blk) ⇒ Object

Send a block to a worker thread similar with pass(). but this get the caller thread waited until the block proceeded in a worker thread.

Parameters:

  • immediage (Bool)
  • time_out (Float) (defaults to: nil)
  • blk (Proc)


67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
# File 'lib/isono/thread_pool.rb', line 67

def barrier(immediate=true, time_out=nil, &blk)
  if immediate && member_thread?
    return blk.call
  end
  
  q = ::Queue.new
  time_start = ::Time.now
  
  self.pass {
    begin
      q << blk.call
    rescue Exception => e
      q << e
    end
  }

  em_sig = nil
  if time_out
    em_sig = EventMachine.add_timer(time_out) {
      q << TimeoutError.new
    }
  end
  
  res = q.shift
  EventMachine.cancel_timer(em_sig)
  time_elapsed = ::Time.now - time_start
  logger.debug("Elapsed time for #{blk}: #{time_elapsed} secs") if time_elapsed > 0.05
  if res.is_a?(Exception)
    raise res
  end
  res
  
end

#clearObject



101
102
103
# File 'lib/isono/thread_pool.rb', line 101

def clear
  @queue.clear
end

#graceful_shutdown2Object



151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
# File 'lib/isono/thread_pool.rb', line 151

def graceful_shutdown2
  # make new jobs push to dummy queue.
  old_queue = @queue
  @queue = ::Queue.new
  # wait until @queue becomes empty
  if !old_queue.empty?
    logger.info("Waiting for #{old_queue.size} worker jobs in #{self}")
    while !old_queue.empty?
      sleep 1
    end
  end

  @worker_threads.each {|t|
    t.raise WorkerTerminateError
  }
end

#member_thread?(thread = Thread.current) ⇒ Boolean

Returns:

  • (Boolean)


113
114
115
# File 'lib/isono/thread_pool.rb', line 113

def member_thread?(thread=Thread.current)
  @worker_threads.has_key?(thread.__id__)      
end

#pass(immediate = true, &blk) ⇒ Object

Pass a block to a worker thread. The job is queued until the worker thread found.

Parameters:

  • immediage (Bool)
  • blk (Proc)

    A block to be proccessed on a worker thread.



53
54
55
56
57
58
59
# File 'lib/isono/thread_pool.rb', line 53

def pass(immediate=true, &blk)
  if immediate && member_thread?
    return blk.call
  end
  
  @queue << blk
end

#shutdownObject

Immediatly shutdown all the worker threads



106
107
108
109
110
111
# File 'lib/isono/thread_pool.rb', line 106

def shutdown()
  @worker_threads.each {|id, t|
    t.__send__(:raise, WorkerTerminateError)
    Thread.pass
  }
end

#shutdown_graceful(timeout) ⇒ Object



117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
# File 'lib/isono/thread_pool.rb', line 117

def shutdown_graceful(timeout)
  term_sig_q = ::Queue.new
  worker_num = @worker_threads.size
  # enqueue the terminate jobs.
  worker_num.times {
    @queue.push proc {
      term_sig_q.enq(1)
      raise WorkerTerminateError
    }
  }

  em_sig = nil
  if timeout > 0.0
    em_sig = EventMachine.add_timer(timeout) {
      worker_num.times {
        term_sig_q << TimeoutError.new
      }
    }
  end

  timeout_workers = 0
  while worker_num > 0
    if term_sig_q.deq.is_a?(TimeoutError)
      timeout_workers += 1
    end
    worker_num -= 1
  end

  logger.error("#{timeout_workers} of worker threads timed out during the cleanup") if timeout_workers > 0
ensure
  shutdown
  EventMachine.cancel_timer(em_sig)
end