Class: Workling::Invokers::ThreadPoolPoller

Inherits:
Base
  • Object
show all
Defined in:
lib/workling/invokers/thread_pool_poller.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods inherited from Base

#logger, #run

Constructor Details

#initialize(routing, client_class) ⇒ ThreadPoolPoller

Returns a new instance of ThreadPoolPoller.



32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
# File 'lib/workling/invokers/thread_pool_poller.rb', line 32

def initialize(routing, client_class)
  @routing = routing
  @client_class = client_class
    
  # Grab settings out of the config file
  @sleep_time = (Workling.config[:sleep_time] || 2).to_f
  @reset_time = (Workling.config[:reset_time] || 30).to_f
    
  # Pool of polling threads
  @pollers = []
  @pollers.extend(Mutex_m)
    
  # Pool of worker threads
  @workers = []
  @workers.extend(Mutex_m)
    
  # Connection to the job queue
  @pool_capacity = (Workling.config[:pool_size] || 25).to_i
end

Instance Attribute Details

#pool_capacityObject (readonly)

Returns the value of attribute pool_capacity.



30
31
32
# File 'lib/workling/invokers/thread_pool_poller.rb', line 30

def pool_capacity
  @pool_capacity
end

#reset_timeObject (readonly)

Returns the value of attribute reset_time.



30
31
32
# File 'lib/workling/invokers/thread_pool_poller.rb', line 30

def reset_time
  @reset_time
end

#sleep_timeObject (readonly)

Returns the value of attribute sleep_time.



30
31
32
# File 'lib/workling/invokers/thread_pool_poller.rb', line 30

def sleep_time
  @sleep_time
end

Instance Method Details

#listenObject

Start up the checking for items on the queue. Will block until stop is called and all pollers and workers have finished execution.



54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
# File 'lib/workling/invokers/thread_pool_poller.rb', line 54

def listen
  logger.info("Starting ThreadPoolPoller...")
    
  # Determine which queues need checking
  Workling::Discovery.discovered_workers.map do |klass|
    @pollers.synchronize do
      # Polls the backing queue for jobs to be done
      @pollers << Thread.new do
        poller_thread(@routing.queue_names_routing_class(klass))
      end
    end
  end
    
  # Wait for the poller and all outstanding workers to finish.
  #
  # This is a little tricky because we're doing some synchronization on pollers... but
  # the list of pollers is never modified after being setup above.
  @pollers.synchronize { @pollers.dup }.each { |p| p.join }
  @pollers.synchronize { @pollers.clear }
  logger.info("Pollers have all finished")
    
  @workers.synchronize { @workers.dup }.each { |w| w.join }
  logger.info("Worker threads have all finished")
end

#poller_threadsObject

Number of currently active polling threads



96
97
98
# File 'lib/workling/invokers/thread_pool_poller.rb', line 96

def poller_threads
  @pollers.synchronize { @pollers.size }
end

#stopObject

Instructs the thread pool poller to stop checking for new jobs on the backing queue.



80
81
82
83
# File 'lib/workling/invokers/thread_pool_poller.rb', line 80

def stop
  logger.info("Stopping thread pool invoker pollers and workers...")
  @pollers.synchronize { @pollers.each { |p| p[:shutdown] = true } }
end

#worker_threadsObject

Number of correctly active worker threads



91
92
93
# File 'lib/workling/invokers/thread_pool_poller.rb', line 91

def worker_threads
  @workers.synchronize { @workers.size }
end

#workers_available?Boolean

Set pool_size in workling config to adjust the maximum number of threads in the pool

Returns:

  • (Boolean)


86
87
88
# File 'lib/workling/invokers/thread_pool_poller.rb', line 86

def workers_available?
  worker_threads < @pool_capacity
end