Class: Contender::Pool::ThreadPoolExecutor

Inherits:
Executor
  • Object
show all
Extended by:
Forwardable
Defined in:
lib/contender/pool/thread_pool_executor.rb

Overview

Executor that uses a cached thread pool to execute tasks in the background

Instance Method Summary collapse

Constructor Details

#initialize(options = Hash.new) ⇒ undefined

Parameters:

  • options (Hash) (defaults to: Hash.new)


9
10
11
12
13
14
15
16
17
18
19
20
21
# File 'lib/contender/pool/thread_pool_executor.rb', line 9

def initialize(options = Hash.new)
  @queue = TaskQueue.new

  @options = {
    size: 2,
    non_block: false
  }.merge options

  @workers = Array.new
  @mutex = Mutex.new

  @state = :inactive
end

Instance Method Details

#active?Boolean

Returns true if this thread pool is active and accepting tasks

Returns:

  • (Boolean)


50
51
52
# File 'lib/contender/pool/thread_pool_executor.rb', line 50

def active?
  :active == @state
end

#backlogInteger

Performs a volatile read on the number of tasks in the queue

Returns:

  • (Integer)


131
# File 'lib/contender/pool/thread_pool_executor.rb', line 131

def_delegators :@queue, :backlog, :empty?, :purge

#empty?Boolean

Returns true if the task queue is empty

Returns:

  • (Boolean)


131
# File 'lib/contender/pool/thread_pool_executor.rb', line 131

def_delegators :@queue, :backlog, :empty?, :purge

#execute(*arguments, &block) ⇒ undefined

Defers the execution of the given block until it can be processed by a worker in the thread pool

Parameters:

  • arguments (Object...)
  • block (Proc)

Returns:

  • (undefined)


30
31
32
33
34
35
36
# File 'lib/contender/pool/thread_pool_executor.rb', line 30

def execute(*arguments, &block)
  unless active?
    raise 'Thread pool has not been started or is shutting down'
  end

  @queue.enqueue(Task.new(block, arguments))
end

#inactive?Boolean

Returns true if this thread pool is inactive

Returns:

  • (Boolean)


42
43
44
# File 'lib/contender/pool/thread_pool_executor.rb', line 42

def inactive?
  :inactive == @state
end

#purgeundefined

Removes any tasks that are queued for execution

Returns:

  • (undefined)


131
# File 'lib/contender/pool/thread_pool_executor.rb', line 131

def_delegators :@queue, :backlog, :empty?, :purge

#shutdownundefined

Blocks until all queued tasks have been executed, then shuts down the thread pool

Returns:

  • (undefined)


85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
# File 'lib/contender/pool/thread_pool_executor.rb', line 85

def shutdown
  return unless active?

  @state = :shutdown

  # TODO This is not ideal
  until @queue.empty?
    # Wait until the task queue is empty
    sleep 0.1
  end

  @state = :shutdown_now

  @queue.wakeup
  @workers.each do |worker|
    worker.join
  end

  cleanup
end

#shutdown!undefined

Shuts down the thread pool instantly, interrupting any executing tasks

Returns:

  • (undefined)


110
111
112
113
114
115
116
117
118
119
120
# File 'lib/contender/pool/thread_pool_executor.rb', line 110

def shutdown!
  return unless active?

  @state = :shutdown_now

  @workers.each do |worker|
    worker.interrupt
  end

  cleanup
end

#shutdown?Boolean

Returns true if this thread pool is waiting to shutdown

Returns:

  • (Boolean)


58
59
60
# File 'lib/contender/pool/thread_pool_executor.rb', line 58

def shutdown?
  :shutdown == @state
end

#shutdown_now?Boolean

Returns true if this thread pool is shutting down now

Returns:

  • (Boolean)


66
67
68
# File 'lib/contender/pool/thread_pool_executor.rb', line 66

def shutdown_now?
  :shutdown_now == @state
end

#startundefined

Starts the thread pool

Returns:

  • (undefined)


74
75
76
77
78
79
# File 'lib/contender/pool/thread_pool_executor.rb', line 74

def start
  return unless inactive?

  start_workers @options.fetch(:size)
  @state = :active
end