Module: Postjob::Queue::Notifications

Extended by:
Notifications
Included in:
Notifications
Defined in:
lib/postjob/queue/notifications.rb

Overview

The Postjob::Queue manages enqueueing and fetching jobs from a job queue.

Constant Summary collapse

SQL =
::Postjob::Queue::SQL
CHANNEL =
"postjob_notifications"
SCHEMA_NAME =
::Postjob::Queue::SCHEMA_NAME
MAX_WAIT_TIME =
120

Instance Method Summary collapse

Instance Method Details

#wait_for_new_job(worker_session_id, queues:) ⇒ Object



12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
# File 'lib/postjob/queue/notifications.rb', line 12

def wait_for_new_job(worker_session_id, queues:)
  started_at = Time.now

  start_listening

  # Determine when the next job is up. If we don't have a next job within MAX_WAIT_TIME
  # we wake up regardless.
  wait_time = time_to_next_job(worker_session_id, queues: queues)
  return if wait_time && wait_time <= 0

  if !wait_time && ::Postjob::Queue.should_shutdown?(worker_session_id)
    Postjob.logger.debug "Shutting down runner: host is set to 'shutdown'"
    return :shutdown
  end

  wait_time = MAX_WAIT_TIME if !wait_time || wait_time > MAX_WAIT_TIME
  Postjob.logger.debug "postjob: waiting for notification for up to #{wait_time} seconds"
  Simple::SQL.wait_for_notify(wait_time)

  # flush notifications. It is possible that a huge number of notifications
  # piled up while we have been waiting. The following line takes care of
  # those.
  while Simple::SQL.wait_for_notify(0.000001)
    :nop
  end

  Postjob.logger.debug "postjob: awoke after #{format('%.03f secs', (Time.now - started_at))}"
rescue Interrupt
  Postjob.logger.info "postjob: shutdown after receiving Interrupt"
  :shutdown
end