Class: Delayed::WorkQueue::ParentProcess::Server

Inherits:
Object
  • Object
show all
Includes:
Logging
Defined in:
lib/delayed/work_queue/parent_process/server.rb

Defined Under Namespace

Classes: ClientState

Constant Summary collapse

SIGNALS =
%i[INT TERM QUIT].freeze

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from Logging

logger, #logger, #say

Constructor Details

#initialize(listen_socket, parent_pid: nil, config: Settings.parent_process) ⇒ Server

Returns a new instance of Server.



14
15
16
17
18
19
20
21
22
23
24
25
26
# File 'lib/delayed/work_queue/parent_process/server.rb', line 14

def initialize(listen_socket, parent_pid: nil, config: Settings.parent_process)
  @listen_socket = listen_socket
  @parent_pid = parent_pid
  @clients = {}
  @waiting_clients = {}
  @prefetched_jobs = {}

  @config = config
  @client_timeout = config["server_socket_timeout"] || 10.0 # left for backwards compat

  @exit = false
  @self_pipe = IO.pipe
end

Instance Attribute Details

#clientsObject (readonly)

Returns the value of attribute clients.



9
10
11
# File 'lib/delayed/work_queue/parent_process/server.rb', line 9

def clients
  @clients
end

#listen_socketObject (readonly)

Returns the value of attribute listen_socket.



9
10
11
# File 'lib/delayed/work_queue/parent_process/server.rb', line 9

def listen_socket
  @listen_socket
end

Instance Method Details

#all_workers_idle?Boolean

Returns:

  • (Boolean)


32
33
34
# File 'lib/delayed/work_queue/parent_process/server.rb', line 32

def all_workers_idle?
  @clients.none? { |_, c| c.working }
end

#check_for_work(forced_latency: nil) ⇒ Object



124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
# File 'lib/delayed/work_queue/parent_process/server.rb', line 124

def check_for_work(forced_latency: nil)
  @waiting_clients.each do |(worker_config, workers)|
    prefetched_jobs = @prefetched_jobs[worker_config] ||= []
    logger.debug("I have #{prefetched_jobs.length} jobs for #{workers.length} waiting workers")
    while !prefetched_jobs.empty? && !workers.empty?
      job = prefetched_jobs.shift
      client = workers.shift
      # couldn't re-lock it for some reason
      logger.debug("Transferring prefetched job to #{client.name}")
      unless job.transfer_lock!(from: prefetch_owner, to: client.name)
        workers.unshift(client)
        next
      end
      client.working = true
      begin
        logger.debug("Sending prefetched job #{job.id} to #{client.name}")
        client_timeout { Marshal.dump(job, client.socket) }
      rescue SystemCallError, IOError, Timeout::Error => e
        logger.error("Failed to send pre-fetched job to #{client.name}: #{e.inspect}")
        drop_socket(client.socket)
        Delayed::Job.unlock([job])
      end
    end

    next if workers.empty?

    logger.debug("Fetching new work for #{workers.length} workers")
    jobs_to_send = []

    Delayed::Worker.lifecycle.run_callbacks(:work_queue_pop, self, worker_config) do
      recipients = workers.map(&:name)

      response = Delayed::Job.get_and_lock_next_available(
        recipients,
        worker_config[:queue],
        worker_config[:min_priority],
        worker_config[:max_priority],
        prefetch: (Settings.fetch_batch_size * (worker_config[:workers] || 1)) - recipients.length,
        prefetch_owner: prefetch_owner,
        forced_latency: forced_latency
      )
      logger.debug(
        "Fetched and locked #{response.values.flatten.size} new jobs for workers (#{response.keys.join(", ")})."
      )
      response.each do |(worker_name, locked_jobs)|
        if worker_name == prefetch_owner
          # it's actually an array of all the extra jobs
          logger.debug(
            "Adding prefetched jobs #{locked_jobs.length} to prefetched array (size: #{prefetched_jobs.count})"
          )
          prefetched_jobs.concat(locked_jobs)
          next
        end
        client = workers.find { |worker| worker.name == worker_name }
        client.working = true
        jobs_to_send << [client, locked_jobs]
      end
    end

    jobs_to_send.each do |(recipient, job_to_send)|
      @waiting_clients[worker_config].delete(recipient)
      begin
        logger.debug("Sending job #{job_to_send.id} to #{recipient.name}")
        client_timeout { Marshal.dump(job_to_send, recipient.socket) }
      rescue SystemCallError, IOError, Timeout::Error => e
        logger.error("Failed to send job to #{recipient.name}: #{e.inspect}")
        drop_socket(recipient.socket)
        Delayed::Job.unlock([job_to_send])
      end
    end
  end
end

#client_timeout(&block) ⇒ Object



277
278
279
# File 'lib/delayed/work_queue/parent_process/server.rb', line 277

def client_timeout(&block)
  Timeout.timeout(@client_timeout, &block)
end

#connected_clientsObject



28
29
30
# File 'lib/delayed/work_queue/parent_process/server.rb', line 28

def connected_clients
  @clients.size
end

#drop_socket(socket) ⇒ Object



251
252
253
254
255
256
257
258
259
260
261
262
263
# File 'lib/delayed/work_queue/parent_process/server.rb', line 251

def drop_socket(socket)
  # this socket went away
  begin
    socket.close
  rescue IOError
    nil
  end
  client = @clients[socket]
  @clients.delete(socket)
  @waiting_clients.each do |(_config, workers)|
    workers.delete(client)
  end
end

#exit?Boolean

Returns:

  • (Boolean)


265
266
267
# File 'lib/delayed/work_queue/parent_process/server.rb', line 265

def exit?
  !!@exit || parent_exited?
end

#handle_acceptObject

Any error on the listen socket other than WaitReadable will bubble up and terminate the work queue process, to be restarted by the parent daemon.



97
98
99
100
101
102
103
# File 'lib/delayed/work_queue/parent_process/server.rb', line 97

def handle_accept
  socket, _addr = @listen_socket.accept_nonblock
  @clients[socket] = ClientState.new(false, socket) if socket
rescue IO::WaitReadable
  logger.error("Server attempted to read listen_socket but failed with IO::WaitReadable")
  # ignore and just try accepting again next time through the loop
end

#handle_read(socket) ⇒ Object



83
84
85
86
87
88
89
90
91
92
93
# File 'lib/delayed/work_queue/parent_process/server.rb', line 83

def handle_read(socket)
  if socket == @listen_socket
    handle_accept
  elsif socket == @self_pipe[0]
    # We really don't care about the contents of the pipe, we just need to
    # wake up.
    @self_pipe[0].read_nonblock(11, exception: false)
  else
    handle_request(socket)
  end
end

#handle_request(socket) ⇒ Object



105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
# File 'lib/delayed/work_queue/parent_process/server.rb', line 105

def handle_request(socket)
  # There is an assumption here that the client will never send a partial
  # request and then leave the socket open. Doing so would leave us hanging
  # in Marshal.load forever. This is only a reasonable assumption because we
  # control the client.
  client = @clients[socket]
  if socket.eof?
    logger.debug("Client #{client.name} closed connection")
    return drop_socket(socket)
  end
  worker_name, worker_config = Marshal.load(socket)
  client.name = worker_name
  client.working = false
  (@waiting_clients[worker_config] ||= []) << client
rescue SystemCallError, IOError => e
  logger.error("Receiving message from client (#{socket}) failed: #{e.inspect}")
  drop_socket(socket)
end

#parent_exited?Boolean

Returns:

  • (Boolean)


273
274
275
# File 'lib/delayed/work_queue/parent_process/server.rb', line 273

def parent_exited?
  @parent_pid && @parent_pid != Process.ppid
end

#prefetch_ownerObject



269
270
271
# File 'lib/delayed/work_queue/parent_process/server.rb', line 269

def prefetch_owner
  "prefetch:#{Socket.gethostname rescue "X"}"
end

#runObject

run the server queue worker this method does not return, only exits or raises an exception



38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
# File 'lib/delayed/work_queue/parent_process/server.rb', line 38

def run
  logger.debug "Starting work queue process"

  SIGNALS.each do |sig|
    # We're not doing any aggressive exiting here since we really want
    # prefetched jobs to be unlocked and we're going to wake up the process
    # from the IO.select we're using to wait on clients.
    trap(sig) do
      @exit = true
      @self_pipe[1].write_nonblock(".", exception: false)
    end
  end

  last_orphaned_prefetched_jobs_purge = Job.db_time_now - rand(15 * 60)
  until exit?
    run_once
    if last_orphaned_prefetched_jobs_purge + (15 * 60) < Job.db_time_now
      Job.unlock_orphaned_prefetched_jobs
      last_orphaned_prefetched_jobs_purge = Job.db_time_now
    end
  end
rescue => e
  logger.error "WorkQueue Server died: #{e.inspect}"
  raise
ensure
  unlock_all_prefetched_jobs
end

#run_onceObject



66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
# File 'lib/delayed/work_queue/parent_process/server.rb', line 66

def run_once
  handles = @clients.keys + [@listen_socket, @self_pipe[0]]
  # if we're currently idle, then force a "latency" to job fetching - don't
  # fetch recently queued jobs, allowing busier workers to fetch them first.
  # if they're not keeping up, the jobs will slip back in time, and suddenly we'll become
  # active and quickly pick up all the jobs we can. The latency is calculated to ensure that
  # an active worker is guaranteed to have attempted to fetch new jobs in the meantime
  forced_latency = Settings.sleep_delay + (Settings.sleep_delay_stagger * 2) if all_workers_idle?
  timeout = Settings.sleep_delay + (rand * Settings.sleep_delay_stagger)
  readable, = IO.select(handles, nil, nil, timeout)
  readable&.each { |s| handle_read(s) }
  Delayed::Worker.lifecycle.run_callbacks(:check_for_work, self) do
    check_for_work(forced_latency: forced_latency)
  end
  unlock_timed_out_prefetched_jobs
end

#unlock_all_prefetched_jobsObject



243
244
245
246
247
248
249
# File 'lib/delayed/work_queue/parent_process/server.rb', line 243

def unlock_all_prefetched_jobs
  # we try really hard; it may not have done any work if it timed out
  10.times do
    unlock_prefetched_jobs
    break if @prefetched_jobs.each_value.all?(&:empty?)
  end
end

#unlock_prefetched_jobsObject



197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
# File 'lib/delayed/work_queue/parent_process/server.rb', line 197

def unlock_prefetched_jobs
  @prefetched_jobs.each do |(worker_config, jobs)|
    next if jobs.empty?
    next if block_given? && !yield(jobs)

    connection = Delayed::Job.connection
    connection.transaction do
      # make absolutely sure we don't get hung up and leave things
      # locked in the database
      if connection.postgresql_version >= 9_06_00 # rubocop:disable Style/NumericLiterals
        connection.idle_in_transaction_session_timeout = 5
      end
      # relatively short timeout for acquiring the lock
      connection.statement_timeout = Settings.sleep_delay
      Delayed::Job.advisory_lock(Delayed::Job.prefetch_jobs_lock_name)

      # this query might take longer, and we really want to get it
      # done if we got the lock, but still don't want an inadvertent
      # hang
      connection.statement_timeout = 30
      Delayed::Job.unlock(jobs)
      @prefetched_jobs[worker_config] = []
    end
  rescue ActiveRecord::QueryCanceled
    # ignore; we'll retry anyway
    logger.warn("unable to unlock prefetched jobs; skipping for now")
  rescue ActiveRecord::StatementInvalid
    # see if we dropped the connection
    raise if connection.active?

    # otherwise just reconnect and let it retry
    logger.warn("failed to unlock prefetched jobs - connection terminated; skipping for now")
    if Rails.version < "6.1"
      ::Delayed::Job.clear_all_connections!
    else
      ::Delayed::Job.clear_all_connections!(nil)
    end
  end
end

#unlock_timed_out_prefetched_jobsObject



237
238
239
240
241
# File 'lib/delayed/work_queue/parent_process/server.rb', line 237

def unlock_timed_out_prefetched_jobs
  unlock_prefetched_jobs do |jobs|
    jobs.first.locked_at < Time.now.utc - Settings.parent_process[:prefetched_jobs_timeout]
  end
end