Class: FastlyNsq::Manager

Inherits:
Object
  • Object
show all
Defined in:
lib/fastly_nsq/manager.rb

Overview

Interface for tracking listeners and managing the processing pool.

Constant Summary collapse

DEADLINE =
30

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(logger: FastlyNsq.logger, max_threads: FastlyNsq.max_processing_pool_threads, **pool_options) ⇒ Manager

Create a FastlyNsq::Manager

Parameters:

  • logger (Logger) (defaults to: FastlyNsq.logger)
  • max_threads (Integer) (defaults to: FastlyNsq.max_processing_pool_threads)

    Maxiumum number of threads to be used by PriorityThreadPool

  • pool_options (Hash)

    Options forwarded to PriorityThreadPool constructor.



23
24
25
26
27
28
29
# File 'lib/fastly_nsq/manager.rb', line 23

def initialize(logger: FastlyNsq.logger, max_threads: FastlyNsq.max_processing_pool_threads, **pool_options)
  @done      = false
  @logger    = logger
  @pool      = FastlyNsq::PriorityThreadPool.new(
    { fallback_policy: :caller_runs, max_threads: max_threads }.merge(pool_options),
  )
end

Instance Attribute Details

#doneBoolean (readonly)

Returns Set true when all listeners are stopped.

Returns:

  • (Boolean)

    Set true when all listeners are stopped



9
10
11
# File 'lib/fastly_nsq/manager.rb', line 9

def done
  @done
end

#loggerLogger (readonly)

Returns:

  • (Logger)


15
16
17
# File 'lib/fastly_nsq/manager.rb', line 15

def logger
  @logger
end

#poolFastlyNsq::PriorityThreadPool (readonly)



12
13
14
# File 'lib/fastly_nsq/manager.rb', line 12

def pool
  @pool
end

Instance Method Details

#add_listener(listener) ⇒ Object

Add a Listener to the @topic_listeners

Parameters:



78
79
80
81
82
83
84
85
86
# File 'lib/fastly_nsq/manager.rb', line 78

def add_listener(listener)
  logger.info { "topic #{listener.topic}, channel #{listener.channel}: listening" }

  if topic_listeners[listener.topic]
    logger.warn { "topic #{listener.topic}: duplicate listener" }
  end

  topic_listeners[listener.topic] = listener
end

#listenersSet

Set of Listener objects

Returns:

  • (Set)


48
49
50
# File 'lib/fastly_nsq/manager.rb', line 48

def listeners
  topic_listeners.values.to_set
end

#stop_listenersObject

Terminate all listeners



101
102
103
104
105
# File 'lib/fastly_nsq/manager.rb', line 101

def stop_listeners
  logger.info { 'Stopping listeners' }
  listeners.each(&:terminate)
  topic_listeners.clear
end

#stopped?Boolean

Manager state

Returns:

  • (Boolean)


71
72
73
# File 'lib/fastly_nsq/manager.rb', line 71

def stopped?
  done
end

#terminate(deadline = DEADLINE) ⇒ Object

Stop the manager. Terminates the listeners and stops all processing in the pool.

Parameters:

  • deadline (Integer) (defaults to: DEADLINE)

    Number of seconds to wait for pool to stop processing



56
57
58
59
60
61
62
63
64
65
66
# File 'lib/fastly_nsq/manager.rb', line 56

def terminate(deadline = DEADLINE)
  return if done

  stop_listeners

  return if pool.shutdown?

  stop_processing(deadline)

  @done = true
end

#topic_listenersHash

Hash of listeners. Keys are topics, values are Listener instances.

Returns:

  • (Hash)


34
35
36
# File 'lib/fastly_nsq/manager.rb', line 34

def topic_listeners
  @topic_listeners ||= {}
end

#topicsArray

Array of listening topic names

Returns:

  • (Array)


41
42
43
# File 'lib/fastly_nsq/manager.rb', line 41

def topics
  topic_listeners.keys
end

#transfer(new_manager, deadline: DEADLINE) ⇒ Object

Transer listeners to a new manager and stop processing from the existing pool.

Parameters:

  • new_manager (FastlyNsq::Manager)

    new manager to which listeners will be added

  • deadline (Integer) (defaults to: DEADLINE)

    Number of seconds to wait for exsiting pool to stop processing



92
93
94
95
96
97
# File 'lib/fastly_nsq/manager.rb', line 92

def transfer(new_manager, deadline: DEADLINE)
  new_manager.topic_listeners.merge!(topic_listeners)
  stop_processing(deadline)
  topic_listeners.clear
  @done = true
end