Class: FastlyNsq::Manager
- Inherits:
-
Object
- Object
- FastlyNsq::Manager
- Defined in:
- lib/fastly_nsq/manager.rb
Overview
Interface for tracking listeners and managing the processing pool.
Constant Summary collapse
- DEADLINE =
30
Instance Attribute Summary collapse
-
#done ⇒ Boolean
readonly
Set true when all listeners are stopped.
- #logger ⇒ Logger readonly
- #pool ⇒ FastlyNsq::PriorityThreadPool readonly
Instance Method Summary collapse
-
#add_listener(listener) ⇒ Object
Add a Listener to the @topic_listeners.
-
#initialize(logger: FastlyNsq.logger, max_threads: FastlyNsq.max_processing_pool_threads, **pool_options) ⇒ Manager
constructor
Create a FastlyNsq::Manager.
-
#listeners ⇒ Set
Set of Listener objects.
-
#stop_listeners ⇒ Object
Terminate all listeners.
-
#stopped? ⇒ Boolean
Manager state.
-
#terminate(deadline = DEADLINE) ⇒ Object
Stop the manager.
-
#topic_listeners ⇒ Hash
Hash of listeners.
-
#topics ⇒ Array
Array of listening topic names.
-
#transfer(new_manager, deadline: DEADLINE) ⇒ Object
Transer listeners to a new manager and stop processing from the existing pool.
Constructor Details
#initialize(logger: FastlyNsq.logger, max_threads: FastlyNsq.max_processing_pool_threads, **pool_options) ⇒ Manager
Create a FastlyNsq::Manager
23 24 25 26 27 28 29 |
# File 'lib/fastly_nsq/manager.rb', line 23 def initialize(logger: FastlyNsq.logger, max_threads: FastlyNsq.max_processing_pool_threads, **) @done = false @logger = logger @pool = FastlyNsq::PriorityThreadPool.new( { fallback_policy: :caller_runs, max_threads: max_threads }.merge(), ) end |
Instance Attribute Details
#done ⇒ Boolean (readonly)
Returns Set true when all listeners are stopped.
9 10 11 |
# File 'lib/fastly_nsq/manager.rb', line 9 def done @done end |
#logger ⇒ Logger (readonly)
15 16 17 |
# File 'lib/fastly_nsq/manager.rb', line 15 def logger @logger end |
#pool ⇒ FastlyNsq::PriorityThreadPool (readonly)
12 13 14 |
# File 'lib/fastly_nsq/manager.rb', line 12 def pool @pool end |
Instance Method Details
#add_listener(listener) ⇒ Object
Add a Listener to the @topic_listeners
78 79 80 81 82 83 84 85 86 |
# File 'lib/fastly_nsq/manager.rb', line 78 def add_listener(listener) logger.info { "topic #{listener.topic}, channel #{listener.channel}: listening" } if topic_listeners[listener.topic] logger.warn { "topic #{listener.topic}: duplicate listener" } end topic_listeners[listener.topic] = listener end |
#listeners ⇒ Set
Set of Listener objects
48 49 50 |
# File 'lib/fastly_nsq/manager.rb', line 48 def listeners topic_listeners.values.to_set end |
#stop_listeners ⇒ Object
Terminate all listeners
101 102 103 104 105 |
# File 'lib/fastly_nsq/manager.rb', line 101 def stop_listeners logger.info { 'Stopping listeners' } listeners.each(&:terminate) topic_listeners.clear end |
#stopped? ⇒ Boolean
Manager state
71 72 73 |
# File 'lib/fastly_nsq/manager.rb', line 71 def stopped? done end |
#terminate(deadline = DEADLINE) ⇒ Object
Stop the manager. Terminates the listeners and stops all processing in the pool.
56 57 58 59 60 61 62 63 64 65 66 |
# File 'lib/fastly_nsq/manager.rb', line 56 def terminate(deadline = DEADLINE) return if done stop_listeners return if pool.shutdown? stop_processing(deadline) @done = true end |
#topic_listeners ⇒ Hash
Hash of listeners. Keys are topics, values are Listener instances.
34 35 36 |
# File 'lib/fastly_nsq/manager.rb', line 34 def topic_listeners @topic_listeners ||= {} end |
#topics ⇒ Array
Array of listening topic names
41 42 43 |
# File 'lib/fastly_nsq/manager.rb', line 41 def topics topic_listeners.keys end |
#transfer(new_manager, deadline: DEADLINE) ⇒ Object
Transer listeners to a new manager and stop processing from the existing pool.
92 93 94 95 96 97 |
# File 'lib/fastly_nsq/manager.rb', line 92 def transfer(new_manager, deadline: DEADLINE) new_manager.topic_listeners.merge!(topic_listeners) stop_processing(deadline) topic_listeners.clear @done = true end |