Class: GoodJob::Notifier

Inherits:
Object
  • Object
show all
Defined in:
lib/good_job/notifier.rb

Overview

Notifiers hook into Postgres LISTEN/NOTIFY functionality to emit and listen for notifications across processes.

Notifiers can emit NOTIFY messages through Postgres. A notifier will LISTEN for messages by creating a background thread that runs in an instance of Concurrent::ThreadPoolExecutor. When a message is received, the notifier passes the message to each of its recipients.

Constant Summary collapse

AdapterCannotListenError =

Raised if the Database adapter does not implement LISTEN.

Class.new(StandardError)
CHANNEL =

Default Postgres channel for LISTEN/NOTIFY

'good_job'
EXECUTOR_OPTIONS =

Defaults for instance of Concurrent::ThreadPoolExecutor

{
  name: name,
  min_threads: 0,
  max_threads: 1,
  auto_terminate: true,
  idletime: 60,
  max_queue: 1,
  fallback_policy: :discard,
}.freeze
RECONNECT_INTERVAL =

Seconds to wait if database cannot be connected to

5
WAIT_INTERVAL =

Seconds to block while LISTENing for a message

1

Class Attribute Summary collapse

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(*recipients) ⇒ Notifier

Returns a new instance of Notifier.

Parameters:

  • recipients (Array<#call, Array(Object, Symbol)>)


53
54
55
56
57
58
59
60
61
# File 'lib/good_job/notifier.rb', line 53

def initialize(*recipients)
  @recipients = Concurrent::Array.new(recipients)
  @listening = Concurrent::AtomicBoolean.new(false)

  self.class.instances << self

  create_executor
  listen
end

Class Attribute Details

.instancesArray<GoodJob::Notifier>? (readonly)

List of all instantiated Notifiers in the current process.

Returns:



37
# File 'lib/good_job/notifier.rb', line 37

cattr_reader :instances, default: [], instance_reader: false

Instance Attribute Details

#recipientsArray<#call, Array(Object, Symbol)> (readonly)

List of recipients that will receive notifications.

Returns:

  • (Array<#call, Array(Object, Symbol)>)


50
51
52
# File 'lib/good_job/notifier.rb', line 50

def recipients
  @recipients
end

Class Method Details

.notify(message) ⇒ Object

Send a message via Postgres NOTIFY

Parameters:

  • message (#to_json)


41
42
43
44
45
46
# File 'lib/good_job/notifier.rb', line 41

def self.notify(message)
  connection = Job.connection
  connection.exec_query "    NOTIFY \#{CHANNEL}, \#{connection.quote(message.to_json)}\n  SQL\nend\n".squish

Instance Method Details

#listening?true, ...

Tests whether the notifier is active and listening for new messages.

Returns:

  • (true, false, nil)


65
66
67
# File 'lib/good_job/notifier.rb', line 65

def listening?
  @listening.true?
end

#restart(timeout: -1)) ⇒ void

This method returns an undefined value.

Restart the notifier. When shutdown, start; or shutdown and start.

Parameters:

  • timeout (nil, Numeric) (defaults to: -1))

    Seconds to wait; shares same values as #shutdown.



103
104
105
106
107
# File 'lib/good_job/notifier.rb', line 103

def restart(timeout: -1)
  shutdown(timeout: timeout) if running?
  create_executor
  listen
end

#running?true, ...

Tests whether the notifier is running.

Returns:

  • (true, false, nil)


72
# File 'lib/good_job/notifier.rb', line 72

delegate :running?, to: :executor, allow_nil: true

#shutdown(timeout: -1)) ⇒ void

This method returns an undefined value.

Shut down the notifier. This stops the background LISTENing thread. Use #shutdown? to determine whether threads have stopped.

Parameters:

  • timeout (Numeric, nil) (defaults to: -1))

    Seconds to wait for active threads.

    • nil, the scheduler will trigger a shutdown but not wait for it to complete.

    • -1, the scheduler will wait until the shutdown is complete.

    • 0, the scheduler will immediately shutdown and stop any threads.

    • A positive number will wait that many seconds before stopping any remaining active threads.



88
89
90
91
92
93
94
95
96
97
# File 'lib/good_job/notifier.rb', line 88

def shutdown(timeout: -1)
  return if executor.nil? || executor.shutdown?

  executor.shutdown if executor.running?

  if executor.shuttingdown? && timeout # rubocop:disable Style/GuardClause
    executor_wait = timeout.negative? ? nil : timeout
    executor.kill unless executor.wait_for_termination(executor_wait)
  end
end

#shutdown?true, ...

Tests whether the scheduler is shutdown.

Returns:

  • (true, false, nil)


77
# File 'lib/good_job/notifier.rb', line 77

delegate :shutdown?, to: :executor, allow_nil: true