Class: Temporalio::Worker::ThreadPool

Inherits:
Object
  • Object
show all
Defined in:
lib/temporalio/worker/thread_pool.rb

Overview

Implementation of a thread pool. This implementation is a stripped down form of Concurrent Ruby’s ‘CachedThreadPool`.

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(max_threads: nil, idle_timeout: 20) ⇒ ThreadPool

Create a new thread pool that creates threads as needed.



28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
# File 'lib/temporalio/worker/thread_pool.rb', line 28

def initialize(max_threads: nil, idle_timeout: 20)
  @max_threads = max_threads
  @idle_timeout = idle_timeout

  @mutex = Mutex.new
  @pool = []
  @ready = []
  @queue = []
  @scheduled_task_count = 0
  @completed_task_count = 0
  @largest_length       = 0
  @workers_counter = 0
  @prune_interval = @idle_timeout / 2
  @next_prune_time = ThreadPool._monotonic_time + @prune_interval
end

Class Method Details

.defaultThreadPool



14
15
16
# File 'lib/temporalio/worker/thread_pool.rb', line 14

def self.default
  @default ||= new
end

Instance Method Details

#active_countInteger



71
72
73
# File 'lib/temporalio/worker/thread_pool.rb', line 71

def active_count
  @mutex.synchronize { @pool.length - @ready.length }
end

#completed_task_countInteger



66
67
68
# File 'lib/temporalio/worker/thread_pool.rb', line 66

def completed_task_count
  @mutex.synchronize { @completed_task_count }
end

#execute { ... } ⇒ Object

Execute the given block in a thread. The block should be built to never raise and need no arguments.

Yields:

  • Block to execute.



47
48
49
50
51
52
53
# File 'lib/temporalio/worker/thread_pool.rb', line 47

def execute(&block)
  @mutex.synchronize do
    locked_assign_worker(&block) || locked_enqueue(&block)
    @scheduled_task_count += 1
    locked_prune_pool if @next_prune_time < ThreadPool._monotonic_time
  end
end

#killObject

Kill each thread. This should not be called until all workers using this executor are complete. This does not need to be called at all on program exit (e.g. for the global default).



97
98
99
100
101
102
103
104
# File 'lib/temporalio/worker/thread_pool.rb', line 97

def kill
  @mutex.synchronize do
    # Kill all workers
    @pool.each(&:kill)
    @pool.clear
    @ready.clear
  end
end

#largest_lengthInteger



56
57
58
# File 'lib/temporalio/worker/thread_pool.rb', line 56

def largest_length
  @mutex.synchronize { @largest_length }
end

#lengthInteger



76
77
78
# File 'lib/temporalio/worker/thread_pool.rb', line 76

def length
  @mutex.synchronize { @pool.length }
end

#queue_lengthInteger



81
82
83
# File 'lib/temporalio/worker/thread_pool.rb', line 81

def queue_length
  @mutex.synchronize { @queue.length }
end

#scheduled_task_countInteger



61
62
63
# File 'lib/temporalio/worker/thread_pool.rb', line 61

def scheduled_task_count
  @mutex.synchronize { @scheduled_task_count }
end

#shutdownObject

Gracefully shutdown each thread when it is done with its current task. This should not be called until all workers using this executor are complete. This does not need to be called at all on program exit (e.g. for the global default).



88
89
90
91
92
93
# File 'lib/temporalio/worker/thread_pool.rb', line 88

def shutdown
  @mutex.synchronize do
    # Stop all workers
    @pool.each(&:stop)
  end
end