Class: Delayed::WorkQueue::ParentProcess::Server

Inherits:
Object
  • Object
show all
Includes:
Logging
Defined in:
lib/delayed/work_queue/parent_process/server.rb

Defined Under Namespace

Classes: ClientState

Constant Summary collapse

SIGNALS =
%i{INT TERM QUIT}

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from Logging

logger, #logger, #say

Constructor Details

#initialize(listen_socket, parent_pid: nil, config: Settings.parent_process) ⇒ Server

Returns a new instance of Server.



12
13
14
15
16
17
18
19
20
21
22
23
24
# File 'lib/delayed/work_queue/parent_process/server.rb', line 12

def initialize(listen_socket, parent_pid: nil, config: Settings.parent_process)
  @listen_socket = listen_socket
  @parent_pid = parent_pid
  @clients = {}
  @waiting_clients = {}
  @prefetched_jobs = {}

  @config = config
  @client_timeout = config['server_socket_timeout'] || 10.0 # left for backwards compat

  @exit = false
  @self_pipe = IO.pipe
end

Instance Attribute Details

#clientsObject (readonly)

Returns the value of attribute clients.



7
8
9
# File 'lib/delayed/work_queue/parent_process/server.rb', line 7

def clients
  @clients
end

#listen_socketObject (readonly)

Returns the value of attribute listen_socket.



7
8
9
# File 'lib/delayed/work_queue/parent_process/server.rb', line 7

def listen_socket
  @listen_socket
end

Instance Method Details

#all_workers_idle?Boolean

Returns:

  • (Boolean)


30
31
32
# File 'lib/delayed/work_queue/parent_process/server.rb', line 30

def all_workers_idle?
  !@clients.any? { |_, c| c.working }
end

#check_for_work(forced_latency: nil) ⇒ Object



124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
# File 'lib/delayed/work_queue/parent_process/server.rb', line 124

def check_for_work(forced_latency: nil)
  @waiting_clients.each do |(worker_config, workers)|
    prefetched_jobs = @prefetched_jobs[worker_config] ||= []
    logger.debug("I have #{prefetched_jobs.length} jobs for #{workers.length} waiting workers")
    while !prefetched_jobs.empty? && !workers.empty?
      job = prefetched_jobs.shift
      client = workers.shift
      # couldn't re-lock it for some reason
      logger.debug("Transferring prefetched job to #{client.name}")
      unless job.transfer_lock!(from: prefetch_owner, to: client.name)
        workers.unshift(client)
        next
      end
      client.working = true
      begin
        logger.debug("Sending prefetched job #{job.id} to #{client.name}")
        client_timeout { Marshal.dump(job, client.socket) }
      rescue SystemCallError, IOError, Timeout::Error => ex
        logger.error("Failed to send pre-fetched job to #{client.name}: #{ex.inspect}")
        drop_socket(client.socket)
        Delayed::Job.unlock([job])
      end
    end

    next if workers.empty?

    logger.debug("Fetching new work for #{workers.length} workers")
    jobs_to_send = []

    Delayed::Worker.lifecycle.run_callbacks(:work_queue_pop, self, worker_config) do
      recipients = workers.map(&:name)

      response = Delayed::Job.get_and_lock_next_available(
          recipients,
          worker_config[:queue],
          worker_config[:min_priority],
          worker_config[:max_priority],
          prefetch: Settings.fetch_batch_size * (worker_config[:workers] || 1) - recipients.length,
          prefetch_owner: prefetch_owner,
          forced_latency: forced_latency)
      logger.debug("Fetched and locked #{response.values.flatten.size} new jobs for workers (#{response.keys.join(', ')}).")
      response.each do |(worker_name, job)|
        if worker_name == prefetch_owner
          # it's actually an array of all the extra jobs
          logger.debug("Adding prefetched jobs #{job.length} to prefetched array (size: #{prefetched_jobs.count})")
          prefetched_jobs.concat(job)
          next
        end
        client = workers.find { |worker| worker.name == worker_name }
        client.working = true
        jobs_to_send << [client, job]
      end
    end

    jobs_to_send.each do |(client, job)|
      @waiting_clients[worker_config].delete(client)
      begin
        logger.debug("Sending job #{job.id} to #{client.name}")
        client_timeout { Marshal.dump(job, client.socket) }
      rescue SystemCallError, IOError, Timeout::Error => ex
        logger.error("Failed to send job to #{client.name}: #{ex.inspect}")
        drop_socket(client.socket)
        Delayed::Job.unlock([job])
      end
    end
  end
end

#client_timeoutObject



235
236
237
# File 'lib/delayed/work_queue/parent_process/server.rb', line 235

def client_timeout
  Timeout.timeout(@client_timeout) { yield }
end

#connected_clientsObject



26
27
28
# File 'lib/delayed/work_queue/parent_process/server.rb', line 26

def connected_clients
  @clients.size
end

#drop_socket(socket) ⇒ Object



210
211
212
213
214
215
216
217
218
219
220
221
# File 'lib/delayed/work_queue/parent_process/server.rb', line 210

def drop_socket(socket)
  # this socket went away
  begin
    socket.close
  rescue IOError
  end
  client = @clients[socket]
  @clients.delete(socket)
  @waiting_clients.each do |(_config, workers)|
    workers.delete(client)
  end
end

#exit?Boolean

Returns:

  • (Boolean)


223
224
225
# File 'lib/delayed/work_queue/parent_process/server.rb', line 223

def exit?
  !!@exit || parent_exited?
end

#handle_acceptObject

Any error on the listen socket other than WaitReadable will bubble up and terminate the work queue process, to be restarted by the parent daemon.



95
96
97
98
99
100
101
102
103
# File 'lib/delayed/work_queue/parent_process/server.rb', line 95

def handle_accept
  socket, _addr = @listen_socket.accept_nonblock
  if socket
    @clients[socket] = ClientState.new(false, socket)
  end
rescue IO::WaitReadable
  logger.error("Server attempted to read listen_socket but failed with IO::WaitReadable")
  # ignore and just try accepting again next time through the loop
end

#handle_read(socket) ⇒ Object



81
82
83
84
85
86
87
88
89
90
91
# File 'lib/delayed/work_queue/parent_process/server.rb', line 81

def handle_read(socket)
  if socket == @listen_socket
    handle_accept
  elsif socket == @self_pipe[0]
    # We really don't care about the contents of the pipe, we just need to
    # wake up.
    @self_pipe[0].read_nonblock(11, exception: false)
  else
    handle_request(socket)
  end
end

#handle_request(socket) ⇒ Object



105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
# File 'lib/delayed/work_queue/parent_process/server.rb', line 105

def handle_request(socket)
  # There is an assumption here that the client will never send a partial
  # request and then leave the socket open. Doing so would leave us hanging
  # in Marshal.load forever. This is only a reasonable assumption because we
  # control the client.
  client = @clients[socket]
  if socket.eof?
    logger.debug("Client #{client.name} closed connection")
    return drop_socket(socket)
  end
  worker_name, worker_config = Marshal.load(socket)
  client.name = worker_name
  client.working = false
  (@waiting_clients[worker_config] ||= []) << client
rescue SystemCallError, IOError => ex
  logger.error("Receiving message from client (#{socket}) failed: #{ex.inspect}")
  drop_socket(socket)
end

#parent_exited?Boolean

Returns:

  • (Boolean)


231
232
233
# File 'lib/delayed/work_queue/parent_process/server.rb', line 231

def parent_exited?
  @parent_pid && @parent_pid != Process.ppid
end

#prefetch_ownerObject



227
228
229
# File 'lib/delayed/work_queue/parent_process/server.rb', line 227

def prefetch_owner
  "prefetch:#{Socket.gethostname rescue 'X'}"
end

#runObject

run the server queue worker this method does not return, only exits or raises an exception



36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
# File 'lib/delayed/work_queue/parent_process/server.rb', line 36

def run
  logger.debug "Starting work queue process"

  SIGNALS.each do |sig|
    # We're not doing any aggressive exiting here since we really want
    # prefetched jobs to be unlocked and we're going to wake up the process
    # from the IO.select we're using to wait on clients.
    trap(sig) { @exit = true; @self_pipe[1].write_nonblock('.', exception: false) }
  end

  last_orphaned_prefetched_jobs_purge = Job.db_time_now - rand(15 * 60)
  while !exit?
    run_once
    if last_orphaned_prefetched_jobs_purge + 15 * 60 < Job.db_time_now
      Job.unlock_orphaned_prefetched_jobs
      last_orphaned_prefetched_jobs_purge = Job.db_time_now
    end
  end

rescue => e
  logger.error "WorkQueue Server died: #{e.inspect}"
  raise
ensure
  unlock_all_prefetched_jobs
end

#run_onceObject



62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
# File 'lib/delayed/work_queue/parent_process/server.rb', line 62

def run_once
  handles = @clients.keys + [@listen_socket, @self_pipe[0]]
  # if we're currently idle, then force a "latency" to job fetching - don't
  # fetch recently queued jobs, allowing busier workers to fetch them first.
  # if they're not keeping up, the jobs will slip back in time, and suddenly we'll become
  # active and quickly pick up all the jobs we can. The latency is calculated to ensure that
  # an active worker is guaranteed to have attempted to fetch new jobs in the meantime
  forced_latency = Settings.sleep_delay + Settings.sleep_delay_stagger * 2 if all_workers_idle?
  timeout = Settings.sleep_delay + (rand * Settings.sleep_delay_stagger)
  readable, _, _ = IO.select(handles, nil, nil, timeout)
  if readable
    readable.each { |s| handle_read(s) }
  end
  Delayed::Worker.lifecycle.run_callbacks(:check_for_work, self) do
    check_for_work(forced_latency: forced_latency)
  end
  unlock_timed_out_prefetched_jobs
end

#unlock_all_prefetched_jobsObject



202
203
204
205
206
207
208
# File 'lib/delayed/work_queue/parent_process/server.rb', line 202

def unlock_all_prefetched_jobs
  @prefetched_jobs.each do |(_worker_config, jobs)|
    next if jobs.empty?
    Delayed::Job.unlock(jobs)
  end
  @prefetched_jobs = {}
end

#unlock_timed_out_prefetched_jobsObject



192
193
194
195
196
197
198
199
200
# File 'lib/delayed/work_queue/parent_process/server.rb', line 192

def unlock_timed_out_prefetched_jobs
  @prefetched_jobs.each do |(worker_config, jobs)|
    next if jobs.empty?
    if jobs.first.locked_at < Time.now.utc - Settings.parent_process[:prefetched_jobs_timeout]
      Delayed::Job.unlock(jobs)
      @prefetched_jobs[worker_config] = []
    end
  end
end