Class: Gcpc::Subscriber::Engine

Inherits:
Object
  • Object
show all
Defined in:
lib/gcpc/subscriber/engine.rb

Constant Summary collapse

WAIT_INTERVAL =
1
WORKER_DEAD_THRESHOLD =

second

30
BEAT_INTERVAL =
10
HEART_BEAT_WORKER_NAME =
'heartbeat-worker'

Instance Method Summary collapse

Constructor Details

#initialize(subscription:, interceptors: [], ack_immediately: false, logger: DefaultLogger) ⇒ Engine

Returns a new instance of Engine.

Parameters:

  • subscription (Google::Cloud::Pubsub::Subscription)
  • interceptors (<#handle, #on_error>) (defaults to: [])
  • ack_immediately (bool) (defaults to: false)
  • logger (Logger) (defaults to: DefaultLogger)


15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
# File 'lib/gcpc/subscriber/engine.rb', line 15

def initialize(
  subscription:,
  interceptors:    [],
  ack_immediately: false,
  logger:          DefaultLogger
)

  @subscription    = subscription
  @interceptors    = interceptors
  @ack_immediately = ack_immediately
  @logger          = logger
  @subscriber_thread_status = {}
  @subscriber_thread_status_mutex = Mutex.new
  @heartbeat_worker_thread = nil

  @subscriber      = nil  # @subscriber is created by calling `#run`
  @handler         = nil  # @handler must be registered by `#handle`

  @stopped_mutex = Mutex.new
  @stopped       = false
end

Instance Method Details

#handle(handler) ⇒ Object

We support registrion of only one handler

Parameters:

  • handler (#handle, #on_error, Class)


92
93
94
95
96
97
# File 'lib/gcpc/subscriber/engine.rb', line 92

def handle(handler)
  @handler = HandleEngine.new(
    handler:      handler,
    interceptors: @interceptors,
  )
end

#run(signals = ['SIGTERM', 'SIGINT']) ⇒ Object

Parameters:

  • signals (<String>) (defaults to: ['SIGTERM', 'SIGINT'])

    Signals which are used to shutdown subscriber gracefully.



39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
# File 'lib/gcpc/subscriber/engine.rb', line 39

def run(signals = ['SIGTERM', 'SIGINT'])
  if @handler.nil?
    raise "You must register handler by #handle before calling #run"
  end

  @logger.info("Starting to subscribe a subscription \"#{@subscription.name}\", will wait for background threads to start...")

  @subscriber = @subscription.listen do |message|
    handle_message(message)
  end
  @subscriber.on_error do |err|
    handle_error(err)
  end
  @subscriber.start

  @logger.info("Started")

  run_heartbeat_worker

  loop_until_receiving_signals(signals)
end

#stopObject



61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
# File 'lib/gcpc/subscriber/engine.rb', line 61

def stop
  if @subscriber.nil?
    raise "You must call #run before stopping"
  end

  @stopped_mutex.synchronize do
    # `#stop` may be called multiple times. Only first call can proceed.
    return if @stopped
    @stopped = true
  end

  @logger.info('Stopping, will wait for background threads to exit')

  @subscriber.stop

  begin
    @heartbeat_worker_thread&.wakeup
  # ThreadError exeption will be raised when the thread already dead
  rescue ThreadError => e
    @logger.error(e.message)
  end

  @heartbeat_worker_thread&.join

  @subscriber.wait!

  @logger.info('Stopped, background threads are shutdown')
end