Class: Concurrent::CachedThreadPool::Worker

Inherits:
Object
  • Object
show all
Defined in:
lib/concurrent/cached_thread_pool/worker.rb

Instance Method Summary collapse

Constructor Details

#initialize(parent) ⇒ Worker

Returns a new instance of Worker.



9
10
11
12
13
14
15
# File 'lib/concurrent/cached_thread_pool/worker.rb', line 9

def initialize(parent)
  @parent = parent
  @mutex = Mutex.new
  @idletime = Time.now
  @resource = ConditionVariable.new
  @tasks = Queue.new
end

Instance Method Details

#dead?Boolean

Returns:



21
22
23
24
25
# File 'lib/concurrent/cached_thread_pool/worker.rb', line 21

def dead?
  return @mutex.synchronize do
    @thread.nil? ? false : ! @thread.alive?
  end
end

#idle?Boolean

Returns:



17
18
19
# File 'lib/concurrent/cached_thread_pool/worker.rb', line 17

def idle?
  return ! @idletime.nil?
end

#idletimeObject



27
28
29
30
31
# File 'lib/concurrent/cached_thread_pool/worker.rb', line 27

def idletime
  return @mutex.synchronize do
    @idletime.nil? ? 0 : Time.now.to_i - @idletime.to_i
  end
end

#killObject



50
51
52
53
54
55
56
57
# File 'lib/concurrent/cached_thread_pool/worker.rb', line 50

def kill
  @mutex.synchronize do
    @idletime = Time.now
    @parent = nil
    Thread.kill(@thread) unless @thread.nil?
    @thread = nil
  end
end

#run(thread = Thread.current) ⇒ Object



59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
# File 'lib/concurrent/cached_thread_pool/worker.rb', line 59

def run(thread = Thread.current)
  @mutex.synchronize do
    raise StandardError.new('already running') unless @thread.nil?
    @thread = thread
  end

  loop do
    task = @mutex.synchronize do
      @resource.wait(@mutex, 60) if @tasks.empty?

      @tasks.pop(true)
    end

    if task == :stop
      @thread = nil
      @parent.on_worker_exit(self)
      @parent = nil
      break
    end

    #@parent.on_start_task(self)
    begin
      task.last.call(*task.first)
    rescue
      # let it fail
    ensure
      @parent.on_end_task(self)
    end
  end
end

#signal(*args, &block) ⇒ Object



33
34
35
36
37
38
39
40
# File 'lib/concurrent/cached_thread_pool/worker.rb', line 33

def signal(*args, &block)
  return @mutex.synchronize do
    break(false) if @parent.nil?
    @tasks << [args, block]
    @resource.signal
    true
  end
end

#stopObject



42
43
44
45
46
47
48
# File 'lib/concurrent/cached_thread_pool/worker.rb', line 42

def stop
  return @mutex.synchronize do
    @tasks.clear
    @tasks << :stop
    @resource.signal
  end
end