Class: Contender::Pool::ThreadPoolExecutor
- Extended by:
- Forwardable
- Defined in:
- lib/contender/pool/thread_pool_executor.rb
Overview
Executor that uses a cached thread pool to execute tasks in the background
Instance Method Summary collapse
-
#active? ⇒ Boolean
Returns true if this thread pool is active and accepting tasks.
-
#backlog ⇒ Integer
Performs a volatile read on the number of tasks in the queue.
-
#empty? ⇒ Boolean
Returns true if the task queue is empty.
-
#execute(*arguments, &block) ⇒ undefined
Defers the execution of the given block until it can be processed by a worker in the thread pool.
-
#inactive? ⇒ Boolean
Returns true if this thread pool is inactive.
- #initialize(options = Hash.new) ⇒ undefined constructor
-
#purge ⇒ undefined
Removes any tasks that are queued for execution.
-
#shutdown ⇒ undefined
Blocks until all queued tasks have been executed, then shuts down the thread pool.
-
#shutdown! ⇒ undefined
Shuts down the thread pool instantly, interrupting any executing tasks.
-
#shutdown? ⇒ Boolean
Returns true if this thread pool is waiting to shutdown.
-
#shutdown_now? ⇒ Boolean
Returns true if this thread pool is shutting down now.
-
#start ⇒ undefined
Starts the thread pool.
Constructor Details
#initialize(options = Hash.new) ⇒ undefined
9 10 11 12 13 14 15 16 17 18 19 20 21 |
# File 'lib/contender/pool/thread_pool_executor.rb', line 9 def initialize( = Hash.new) @queue = TaskQueue.new @options = { size: 2, non_block: false }.merge @workers = Array.new @mutex = Mutex.new @state = :inactive end |
Instance Method Details
#active? ⇒ Boolean
Returns true if this thread pool is active and accepting tasks
50 51 52 |
# File 'lib/contender/pool/thread_pool_executor.rb', line 50 def active? :active == @state end |
#backlog ⇒ Integer
Performs a volatile read on the number of tasks in the queue
131 |
# File 'lib/contender/pool/thread_pool_executor.rb', line 131 def_delegators :@queue, :backlog, :empty?, :purge |
#empty? ⇒ Boolean
Returns true if the task queue is empty
131 |
# File 'lib/contender/pool/thread_pool_executor.rb', line 131 def_delegators :@queue, :backlog, :empty?, :purge |
#execute(*arguments, &block) ⇒ undefined
Defers the execution of the given block until it can be processed by a worker in the thread pool
30 31 32 33 34 35 36 |
# File 'lib/contender/pool/thread_pool_executor.rb', line 30 def execute(*arguments, &block) unless active? raise 'Thread pool has not been started or is shutting down' end @queue.enqueue(Task.new(block, arguments)) end |
#inactive? ⇒ Boolean
Returns true if this thread pool is inactive
42 43 44 |
# File 'lib/contender/pool/thread_pool_executor.rb', line 42 def inactive? :inactive == @state end |
#purge ⇒ undefined
Removes any tasks that are queued for execution
131 |
# File 'lib/contender/pool/thread_pool_executor.rb', line 131 def_delegators :@queue, :backlog, :empty?, :purge |
#shutdown ⇒ undefined
Blocks until all queued tasks have been executed, then shuts down the thread pool
85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 |
# File 'lib/contender/pool/thread_pool_executor.rb', line 85 def shutdown return unless active? @state = :shutdown # TODO This is not ideal until @queue.empty? # Wait until the task queue is empty sleep 0.1 end @state = :shutdown_now @queue.wakeup @workers.each do |worker| worker.join end cleanup end |
#shutdown! ⇒ undefined
Shuts down the thread pool instantly, interrupting any executing tasks
110 111 112 113 114 115 116 117 118 119 120 |
# File 'lib/contender/pool/thread_pool_executor.rb', line 110 def shutdown! return unless active? @state = :shutdown_now @workers.each do |worker| worker.interrupt end cleanup end |
#shutdown? ⇒ Boolean
Returns true if this thread pool is waiting to shutdown
58 59 60 |
# File 'lib/contender/pool/thread_pool_executor.rb', line 58 def shutdown? :shutdown == @state end |
#shutdown_now? ⇒ Boolean
Returns true if this thread pool is shutting down now
66 67 68 |
# File 'lib/contender/pool/thread_pool_executor.rb', line 66 def shutdown_now? :shutdown_now == @state end |
#start ⇒ undefined
Starts the thread pool
74 75 76 77 78 79 |
# File 'lib/contender/pool/thread_pool_executor.rb', line 74 def start return unless inactive? start_workers @options.fetch(:size) @state = :active end |