Class: FastlyNsq::Manager

Inherits:
Object
  • Object
show all
Defined in:
lib/fastly_nsq/manager.rb

Overview

Interface for tracking listeners and managing the processing pool.

Constant Summary collapse

DEADLINE =
30

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(**opts) ⇒ Manager

Create a FastlyNsq::Manager

  • max_threads [Integer] Maxiumum number of threads to be used by PriorityThreadPool

  • logger [Logger]

Parameters:

  • opts (Hash)

    Set of options passed to FastlyNsqw::PriorityThreadPool. valid options include:



23
24
25
26
27
28
29
30
# File 'lib/fastly_nsq/manager.rb', line 23

def initialize(**opts) # logger: FastlyNsq.logger, max_threads: FastlyNsq.max_processing_pool_threads)
  @done = false
  @logger = opts[:logger] || FastlyNsq.logger
  max_threads = opts[:max_threads] || FastlyNsq.max_processing_pool_threads
  @pool = FastlyNsq::PriorityThreadPool.new(
    {fallback_policy: :caller_runs, max_threads: max_threads}.merge(opts)
  )
end

Instance Attribute Details

#doneBoolean (readonly)

Returns Set true when all listeners are stopped.

Returns:

  • (Boolean)

    Set true when all listeners are stopped



9
10
11
# File 'lib/fastly_nsq/manager.rb', line 9

def done
  @done
end

#loggerLogger (readonly)

Returns:

  • (Logger)


15
16
17
# File 'lib/fastly_nsq/manager.rb', line 15

def logger
  @logger
end

#poolFastlyNsq::PriorityThreadPool (readonly)



12
13
14
# File 'lib/fastly_nsq/manager.rb', line 12

def pool
  @pool
end

Instance Method Details

#add_listener(listener) ⇒ Object

Add a Listener to the @topic_listeners

Parameters:



79
80
81
82
83
84
85
86
87
# File 'lib/fastly_nsq/manager.rb', line 79

def add_listener(listener)
  logger.info { "topic #{listener.topic}, channel #{listener.channel}: listening" }

  if topic_listeners[listener.topic]
    logger.warn { "topic #{listener.topic}: duplicate listener" }
  end

  topic_listeners[listener.topic] = listener
end

#listenersSet

Set of Listener objects

Returns:

  • (Set)


49
50
51
# File 'lib/fastly_nsq/manager.rb', line 49

def listeners
  topic_listeners.values.to_set
end

#stop_listenersObject

Terminate all listeners



102
103
104
105
106
# File 'lib/fastly_nsq/manager.rb', line 102

def stop_listeners
  logger.info { "Stopping listeners" }
  listeners.each(&:terminate)
  topic_listeners.clear
end

#stopped?Boolean

Manager state

Returns:

  • (Boolean)


72
73
74
# File 'lib/fastly_nsq/manager.rb', line 72

def stopped?
  done
end

#terminate(deadline = DEADLINE) ⇒ Object

Stop the manager. Terminates the listeners and stops all processing in the pool.

Parameters:

  • deadline (Integer) (defaults to: DEADLINE)

    Number of seconds to wait for pool to stop processing



57
58
59
60
61
62
63
64
65
66
67
# File 'lib/fastly_nsq/manager.rb', line 57

def terminate(deadline = DEADLINE)
  return if done

  stop_listeners

  return if pool.shutdown?

  stop_processing(deadline)

  @done = true
end

#topic_listenersHash

Hash of listeners. Keys are topics, values are Listener instances.

Returns:

  • (Hash)


35
36
37
# File 'lib/fastly_nsq/manager.rb', line 35

def topic_listeners
  @topic_listeners ||= {}
end

#topicsArray

Array of listening topic names

Returns:

  • (Array)


42
43
44
# File 'lib/fastly_nsq/manager.rb', line 42

def topics
  topic_listeners.keys
end

#transfer(new_manager, deadline: DEADLINE) ⇒ Object

Transer listeners to a new manager and stop processing from the existing pool.

Parameters:

  • new_manager (FastlyNsq::Manager)

    new manager to which listeners will be added

  • deadline (Integer) (defaults to: DEADLINE)

    Number of seconds to wait for exsiting pool to stop processing



93
94
95
96
97
98
# File 'lib/fastly_nsq/manager.rb', line 93

def transfer(new_manager, deadline: DEADLINE)
  new_manager.topic_listeners.merge!(topic_listeners)
  stop_processing(deadline)
  topic_listeners.clear
  @done = true
end