Class: Subserver::Listener
- Inherits:
-
Object
- Object
- Subserver::Listener
- Includes:
- Util
- Defined in:
- lib/subserver/listener.rb
Overview
The Listener is a standalone thread which:
-
Starts Google Pubsub subscription threads which:
a. Instantiate the Subscription class
b. Run the middleware chain
c. call subscriber #perform
A Listener can exit due to shutdown (listner_stopped) or due to an error during message processing (listener_died)
If an error occurs during message processing, the Listener calls the Manager to create a new one to replace itself and exits.
Defined Under Namespace
Classes: Counter
Constant Summary collapse
- PROCESSED =
Counter.new
- FAILURE =
Counter.new
- WORKER_STATE =
This is mutable global state but because each thread is storing its own unique key/value, there’s no thread-safety issue AFAIK.
{}
Constants included from Util
Instance Attribute Summary collapse
-
#subscriber ⇒ Object
readonly
Returns the value of attribute subscriber.
-
#thread ⇒ Object
readonly
Returns the value of attribute thread.
Instance Method Summary collapse
- #connect_subscriber ⇒ Object
- #execute_processor(subscriber, received_message) ⇒ Object
-
#initialize(mgr, subscriber) ⇒ Listener
constructor
A new instance of Listener.
- #kill ⇒ Object
- #name ⇒ Object
- #process_message(received_message) ⇒ Object
- #retrive_subscrption ⇒ Object
- #run ⇒ Object
- #start ⇒ Object
- #stats(job_hash, queue) ⇒ Object
- #stop ⇒ Object
Methods included from Util
#fire_event, #hostname, #identity, #logger, #process_nonce, #safe_thread, #watchdog
Methods included from ExceptionHandler
Constructor Details
#initialize(mgr, subscriber) ⇒ Listener
Returns a new instance of Listener.
30 31 32 33 34 35 36 37 38 39 |
# File 'lib/subserver/listener.rb', line 30 def initialize(mgr, subscriber) @mgr = mgr @down = false @done = false @thread = nil @reloader = Subserver.[:reloader] @subscriber = subscriber @subscription = retrive_subscrption @logging = (mgr.[:message_logger] || Subserver::MessageLogger).new end |
Instance Attribute Details
#subscriber ⇒ Object (readonly)
Returns the value of attribute subscriber.
28 29 30 |
# File 'lib/subserver/listener.rb', line 28 def subscriber @subscriber end |
#thread ⇒ Object (readonly)
Returns the value of attribute thread.
27 28 29 |
# File 'lib/subserver/listener.rb', line 27 def thread @thread end |
Instance Method Details
#connect_subscriber ⇒ Object
78 79 80 81 82 83 84 85 |
# File 'lib/subserver/listener.rb', line 78 def connect_subscriber = @subscriber. logger.debug("Connecting to subscription with options: #{options}") @pubsub_listener = @subscription.listen streams: [:streams], threads: [:threads] do || logger.debug("Message Received: #{received_message}") () end end |
#execute_processor(subscriber, received_message) ⇒ Object
113 114 115 |
# File 'lib/subserver/listener.rb', line 113 def execute_processor(subscriber, ) subscriber.new.perform() end |
#kill ⇒ Object
54 55 56 57 58 59 60 |
# File 'lib/subserver/listener.rb', line 54 def kill @done = true return if !@thread # Hard stop the listener and shutdown thread after timeout passes. @pubsub_listener.stop @thread.raise ::Subserver::Shutdown end |
#name ⇒ Object
41 42 43 |
# File 'lib/subserver/listener.rb', line 41 def name @subscriber.name end |
#process_message(received_message) ⇒ Object
98 99 100 101 102 103 104 105 106 107 108 109 110 111 |
# File 'lib/subserver/listener.rb', line 98 def () begin logger.debug("Executing Middleware") Subserver.middleware.invoke(@subscriber, ) do execute_processor(@subscriber, ) end rescue Subserver::Shutdown # Reject message if shutdown .reject! rescue Exception => ex handle_exception(e, { context: "Exception raised during message processing.", message: }) raise e end end |
#retrive_subscrption ⇒ Object
68 69 70 71 72 73 74 75 76 |
# File 'lib/subserver/listener.rb', line 68 def retrive_subscrption subscription_name = @subscriber.[:subscription] begin subscription = Pubsub.client.subscription subscription_name rescue Google::Cloud::Error => e raise ArgumentError, "Invalid Subscription name: #{subscription_name} Please ensure your Pubsub subscription exists." end subscription end |
#run ⇒ Object
87 88 89 90 91 92 93 94 95 96 |
# File 'lib/subserver/listener.rb', line 87 def run begin connect_subscriber @pubsub_listener.start rescue Subserver::Shutdown @mgr.listener_stopped(self) rescue Exception => ex @mgr.listener_died(self, @subscriber, ex) end end |
#start ⇒ Object
62 63 64 |
# File 'lib/subserver/listener.rb', line 62 def start @thread ||= safe_thread("listener", &method(:run)) end |
#stats(job_hash, queue) ⇒ Object
141 142 143 144 145 146 147 148 149 150 151 152 153 154 |
# File 'lib/subserver/listener.rb', line 141 def stats(job_hash, queue) tid = Subserver::Logging.tid WORKER_STATE[tid] = {:queue => queue, :payload => job_hash, :run_at => Time.now.to_i } begin yield rescue Exception FAILURE.incr raise ensure WORKER_STATE.delete(tid) PROCESSED.incr end end |
#stop ⇒ Object
45 46 47 48 49 50 51 52 |
# File 'lib/subserver/listener.rb', line 45 def stop @done = true return if !@thread # Stop the listener and wait for current messages to finish processing. @pubsub_listener.stop.wait! @mgr.listener_stopped(self) end |