Class: FastlyNsq::Manager
- Inherits:
-
Object
- Object
- FastlyNsq::Manager
- Defined in:
- lib/fastly_nsq/manager.rb
Overview
Interface for tracking listeners and managing the processing pool.
Constant Summary collapse
- DEADLINE =
30
Instance Attribute Summary collapse
-
#done ⇒ Boolean
readonly
Set true when all listeners are stopped.
- #logger ⇒ Logger readonly
- #pool ⇒ FastlyNsq::PriorityThreadPool readonly
Instance Method Summary collapse
-
#add_listener(listener) ⇒ Object
Add a Listener to the @topic_listeners.
-
#initialize(**opts) ⇒ Manager
constructor
Create a FastlyNsq::Manager.
-
#listeners ⇒ Set
Set of Listener objects.
-
#stop_listeners ⇒ Object
Terminate all listeners.
-
#stopped? ⇒ Boolean
Manager state.
-
#terminate(deadline = DEADLINE) ⇒ Object
Stop the manager.
-
#topic_listeners ⇒ Hash
Hash of listeners.
-
#topics ⇒ Array
Array of listening topic names.
-
#transfer(new_manager, deadline: DEADLINE) ⇒ Object
Transer listeners to a new manager and stop processing from the existing pool.
Constructor Details
#initialize(**opts) ⇒ Manager
Create a FastlyNsq::Manager
-
max_threads [Integer] Maxiumum number of threads to be used by PriorityThreadPool
-
logger [Logger]
23 24 25 26 27 28 29 30 |
# File 'lib/fastly_nsq/manager.rb', line 23 def initialize(**opts) # logger: FastlyNsq.logger, max_threads: FastlyNsq.max_processing_pool_threads) @done = false @logger = opts[:logger] || FastlyNsq.logger max_threads = opts[:max_threads] || FastlyNsq.max_processing_pool_threads @pool = FastlyNsq::PriorityThreadPool.new( {fallback_policy: :caller_runs, max_threads: max_threads}.merge(opts) ) end |
Instance Attribute Details
#done ⇒ Boolean (readonly)
Returns Set true when all listeners are stopped.
9 10 11 |
# File 'lib/fastly_nsq/manager.rb', line 9 def done @done end |
#logger ⇒ Logger (readonly)
15 16 17 |
# File 'lib/fastly_nsq/manager.rb', line 15 def logger @logger end |
#pool ⇒ FastlyNsq::PriorityThreadPool (readonly)
12 13 14 |
# File 'lib/fastly_nsq/manager.rb', line 12 def pool @pool end |
Instance Method Details
#add_listener(listener) ⇒ Object
Add a Listener to the @topic_listeners
79 80 81 82 83 84 85 86 87 |
# File 'lib/fastly_nsq/manager.rb', line 79 def add_listener(listener) logger.info { "topic #{listener.topic}, channel #{listener.channel}: listening" } if topic_listeners[listener.topic] logger.warn { "topic #{listener.topic}: duplicate listener" } end topic_listeners[listener.topic] = listener end |
#listeners ⇒ Set
Set of Listener objects
49 50 51 |
# File 'lib/fastly_nsq/manager.rb', line 49 def listeners topic_listeners.values.to_set end |
#stop_listeners ⇒ Object
Terminate all listeners
102 103 104 105 106 |
# File 'lib/fastly_nsq/manager.rb', line 102 def stop_listeners logger.info { "Stopping listeners" } listeners.each(&:terminate) topic_listeners.clear end |
#stopped? ⇒ Boolean
Manager state
72 73 74 |
# File 'lib/fastly_nsq/manager.rb', line 72 def stopped? done end |
#terminate(deadline = DEADLINE) ⇒ Object
Stop the manager. Terminates the listeners and stops all processing in the pool.
57 58 59 60 61 62 63 64 65 66 67 |
# File 'lib/fastly_nsq/manager.rb', line 57 def terminate(deadline = DEADLINE) return if done stop_listeners return if pool.shutdown? stop_processing(deadline) @done = true end |
#topic_listeners ⇒ Hash
Hash of listeners. Keys are topics, values are Listener instances.
35 36 37 |
# File 'lib/fastly_nsq/manager.rb', line 35 def topic_listeners @topic_listeners ||= {} end |
#topics ⇒ Array
Array of listening topic names
42 43 44 |
# File 'lib/fastly_nsq/manager.rb', line 42 def topics topic_listeners.keys end |
#transfer(new_manager, deadline: DEADLINE) ⇒ Object
Transer listeners to a new manager and stop processing from the existing pool.
93 94 95 96 97 98 |
# File 'lib/fastly_nsq/manager.rb', line 93 def transfer(new_manager, deadline: DEADLINE) new_manager.topic_listeners.merge!(topic_listeners) stop_processing(deadline) topic_listeners.clear @done = true end |