Class: Gcpc::Subscriber::Engine
- Inherits:
-
Object
- Object
- Gcpc::Subscriber::Engine
- Defined in:
- lib/gcpc/subscriber/engine.rb
Constant Summary collapse
- WAIT_INTERVAL =
1- WORKER_DEAD_THRESHOLD =
second
30- BEAT_INTERVAL =
10- HEART_BEAT_WORKER_NAME =
'heartbeat-worker'
Instance Method Summary collapse
-
#handle(handler) ⇒ Object
We support registrion of only one handler.
-
#initialize(subscription:, interceptors: [], ack_immediately: false, logger: DefaultLogger) ⇒ Engine
constructor
A new instance of Engine.
- #run(signals = ['SIGTERM', 'SIGINT']) ⇒ Object
- #stop ⇒ Object
Constructor Details
#initialize(subscription:, interceptors: [], ack_immediately: false, logger: DefaultLogger) ⇒ Engine
Returns a new instance of Engine.
15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 |
# File 'lib/gcpc/subscriber/engine.rb', line 15 def initialize( subscription:, interceptors: [], ack_immediately: false, logger: DefaultLogger ) @subscription = subscription @interceptors = interceptors @ack_immediately = ack_immediately @logger = logger @subscriber_thread_status = {} @subscriber_thread_status_mutex = Mutex.new @heartbeat_worker_thread = nil @subscriber = nil # @subscriber is created by calling `#run` @handler = nil # @handler must be registered by `#handle` @stopped_mutex = Mutex.new @stopped = false end |
Instance Method Details
#handle(handler) ⇒ Object
We support registrion of only one handler
92 93 94 95 96 97 |
# File 'lib/gcpc/subscriber/engine.rb', line 92 def handle(handler) @handler = HandleEngine.new( handler: handler, interceptors: @interceptors, ) end |
#run(signals = ['SIGTERM', 'SIGINT']) ⇒ Object
39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 |
# File 'lib/gcpc/subscriber/engine.rb', line 39 def run(signals = ['SIGTERM', 'SIGINT']) if @handler.nil? raise "You must register handler by #handle before calling #run" end @logger.info("Starting to subscribe a subscription \"#{@subscription.name}\", will wait for background threads to start...") @subscriber = @subscription.listen do || () end @subscriber.on_error do |err| handle_error(err) end @subscriber.start @logger.info("Started") run_heartbeat_worker loop_until_receiving_signals(signals) end |
#stop ⇒ Object
61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 |
# File 'lib/gcpc/subscriber/engine.rb', line 61 def stop if @subscriber.nil? raise "You must call #run before stopping" end @stopped_mutex.synchronize do # `#stop` may be called multiple times. Only first call can proceed. return if @stopped @stopped = true end @logger.info('Stopping, will wait for background threads to exit') @subscriber.stop begin @heartbeat_worker_thread&.wakeup # ThreadError exeption will be raised when the thread already dead rescue ThreadError => e @logger.error(e.) end @heartbeat_worker_thread&.join @subscriber.wait! @logger.info('Stopped, background threads are shutdown') end |