Class: Delayed::WorkQueue::ParentProcess::Server

Inherits:
Object
  • Object
show all
Includes:
Logging
Defined in:
lib/delayed/work_queue/parent_process/server.rb

Defined Under Namespace

Classes: ClientState

Constant Summary collapse

SIGNALS =
%i{INT TERM QUIT}

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from Logging

logger, #logger, #say

Constructor Details

#initialize(listen_socket, parent_pid: nil, config: Settings.parent_process) ⇒ Server

Returns a new instance of Server.



10
11
12
13
14
15
16
17
18
19
20
21
22
# File 'lib/delayed/work_queue/parent_process/server.rb', line 10

def initialize(listen_socket, parent_pid: nil, config: Settings.parent_process)
  @listen_socket = listen_socket
  @parent_pid = parent_pid
  @clients = {}
  @waiting_clients = {}
  @prefetched_jobs = {}

  @config = config
  @client_timeout = config['server_socket_timeout'] || 10.0 # left for backwards compat

  @exit = false
  @self_pipe = IO.pipe
end

Instance Attribute Details

#clientsObject (readonly)

Returns the value of attribute clients.



5
6
7
# File 'lib/delayed/work_queue/parent_process/server.rb', line 5

def clients
  @clients
end

#listen_socketObject (readonly)

Returns the value of attribute listen_socket.



5
6
7
# File 'lib/delayed/work_queue/parent_process/server.rb', line 5

def listen_socket
  @listen_socket
end

Instance Method Details

#all_workers_idle?Boolean

Returns:

  • (Boolean)


28
29
30
# File 'lib/delayed/work_queue/parent_process/server.rb', line 28

def all_workers_idle?
  !@clients.any? { |_, c| c.working }
end

#check_for_work(forced_latency: nil) ⇒ Object



122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
# File 'lib/delayed/work_queue/parent_process/server.rb', line 122

def check_for_work(forced_latency: nil)
  @waiting_clients.each do |(worker_config, workers)|
    prefetched_jobs = @prefetched_jobs[worker_config] ||= []
    logger.debug("I have #{prefetched_jobs.length} jobs for #{workers.length} waiting workers")
    while !prefetched_jobs.empty? && !workers.empty?
      job = prefetched_jobs.shift
      client = workers.shift
      # couldn't re-lock it for some reason
      logger.debug("Transferring prefetched job to #{client.name}")
      unless job.transfer_lock!(from: prefetch_owner, to: client.name)
        workers.unshift(client)
        next
      end
      client.working = true
      begin
        logger.debug("Sending prefetched job #{job.id} to #{client.name}")
        client_timeout { Marshal.dump(job, client.socket) }
      rescue SystemCallError, IOError, Timeout::Error => ex
        logger.error("Failed to send pre-fetched job to #{client.name}: #{ex.inspect}")
        drop_socket(client.socket)
        Delayed::Job.unlock([job])
      end
    end

    next if workers.empty?

    logger.debug("Fetching new work for #{workers.length} workers")
    jobs_to_send = []

    Delayed::Worker.lifecycle.run_callbacks(:work_queue_pop, self, worker_config) do
      recipients = workers.map(&:name)

      response = Delayed::Job.get_and_lock_next_available(
          recipients,
          worker_config[:queue],
          worker_config[:min_priority],
          worker_config[:max_priority],
          prefetch: Settings.fetch_batch_size * (worker_config[:workers] || 1) - recipients.length,
          prefetch_owner: prefetch_owner,
          forced_latency: forced_latency)
      logger.debug("Fetched and locked #{response.values.flatten.size} new jobs for workers (#{response.keys.join(', ')}).")
      response.each do |(worker_name, job)|
        if worker_name == prefetch_owner
          # it's actually an array of all the extra jobs
          logger.debug("Adding prefetched jobs #{job.length} to prefetched array (size: #{prefetched_jobs.count})")
          prefetched_jobs.concat(job)
          next
        end
        client = workers.find { |worker| worker.name == worker_name }
        client.working = true
        jobs_to_send << [client, job]
      end
    end

    jobs_to_send.each do |(client, job)|
      @waiting_clients[worker_config].delete(client)
      begin
        logger.debug("Sending job #{job.id} to #{client.name}")
        client_timeout { Marshal.dump(job, client.socket) }
      rescue SystemCallError, IOError, Timeout::Error => ex
        logger.error("Failed to send job to #{client.name}: #{ex.inspect}")
        drop_socket(client.socket)
        Delayed::Job.unlock([job])
      end
    end
  end
end

#client_timeoutObject



233
234
235
# File 'lib/delayed/work_queue/parent_process/server.rb', line 233

def client_timeout
  Timeout.timeout(@client_timeout) { yield }
end

#connected_clientsObject



24
25
26
# File 'lib/delayed/work_queue/parent_process/server.rb', line 24

def connected_clients
  @clients.size
end

#drop_socket(socket) ⇒ Object



208
209
210
211
212
213
214
215
216
217
218
219
# File 'lib/delayed/work_queue/parent_process/server.rb', line 208

def drop_socket(socket)
  # this socket went away
  begin
    socket.close
  rescue IOError
  end
  client = @clients[socket]
  @clients.delete(socket)
  @waiting_clients.each do |(_config, workers)|
    workers.delete(client)
  end
end

#exit?Boolean

Returns:

  • (Boolean)


221
222
223
# File 'lib/delayed/work_queue/parent_process/server.rb', line 221

def exit?
  !!@exit || parent_exited?
end

#handle_acceptObject

Any error on the listen socket other than WaitReadable will bubble up and terminate the work queue process, to be restarted by the parent daemon.



93
94
95
96
97
98
99
100
101
# File 'lib/delayed/work_queue/parent_process/server.rb', line 93

def handle_accept
  socket, _addr = @listen_socket.accept_nonblock
  if socket
    @clients[socket] = ClientState.new(false, socket)
  end
rescue IO::WaitReadable
  logger.error("Server attempted to read listen_socket but failed with IO::WaitReadable")
  # ignore and just try accepting again next time through the loop
end

#handle_read(socket) ⇒ Object



79
80
81
82
83
84
85
86
87
88
89
# File 'lib/delayed/work_queue/parent_process/server.rb', line 79

def handle_read(socket)
  if socket == @listen_socket
    handle_accept
  elsif socket == @self_pipe[0]
    # We really don't care about the contents of the pipe, we just need to
    # wake up.
    @self_pipe[0].read_nonblock(11, exception: false)
  else
    handle_request(socket)
  end
end

#handle_request(socket) ⇒ Object



103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
# File 'lib/delayed/work_queue/parent_process/server.rb', line 103

def handle_request(socket)
  # There is an assumption here that the client will never send a partial
  # request and then leave the socket open. Doing so would leave us hanging
  # in Marshal.load forever. This is only a reasonable assumption because we
  # control the client.
  client = @clients[socket]
  if socket.eof?
    logger.debug("Client #{client.name} closed connection")
    return drop_socket(socket)
  end
  worker_name, worker_config = Marshal.load(socket)
  client.name = worker_name
  client.working = false
  (@waiting_clients[worker_config] ||= []) << client
rescue SystemCallError, IOError => ex
  logger.error("Receiving message from client (#{socket}) failed: #{ex.inspect}")
  drop_socket(socket)
end

#parent_exited?Boolean

Returns:

  • (Boolean)


229
230
231
# File 'lib/delayed/work_queue/parent_process/server.rb', line 229

def parent_exited?
  @parent_pid && @parent_pid != Process.ppid
end

#prefetch_ownerObject



225
226
227
# File 'lib/delayed/work_queue/parent_process/server.rb', line 225

def prefetch_owner
  "prefetch:#{Socket.gethostname rescue 'X'}"
end

#runObject

run the server queue worker this method does not return, only exits or raises an exception



34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
# File 'lib/delayed/work_queue/parent_process/server.rb', line 34

def run
  logger.debug "Starting work queue process"

  SIGNALS.each do |sig|
    # We're not doing any aggressive exiting here since we really want
    # prefetched jobs to be unlocked and we're going to wake up the process
    # from the IO.select we're using to wait on clients.
    trap(sig) { @exit = true; @self_pipe[1].write_nonblock('.', exception: false) }
  end

  last_orphaned_prefetched_jobs_purge = Job.db_time_now - rand(15 * 60)
  while !exit?
    run_once
    if last_orphaned_prefetched_jobs_purge + 15 * 60 < Job.db_time_now
      Job.unlock_orphaned_prefetched_jobs
      last_orphaned_prefetched_jobs_purge = Job.db_time_now
    end
  end

rescue => e
  logger.error "WorkQueue Server died: #{e.inspect}"
  raise
ensure
  unlock_all_prefetched_jobs
end

#run_onceObject



60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
# File 'lib/delayed/work_queue/parent_process/server.rb', line 60

def run_once
  handles = @clients.keys + [@listen_socket, @self_pipe[0]]
  # if we're currently idle, then force a "latency" to job fetching - don't
  # fetch recently queued jobs, allowing busier workers to fetch them first.
  # if they're not keeping up, the jobs will slip back in time, and suddenly we'll become
  # active and quickly pick up all the jobs we can. The latency is calculated to ensure that
  # an active worker is guaranteed to have attempted to fetch new jobs in the meantime
  forced_latency = Settings.sleep_delay + Settings.sleep_delay_stagger * 2 if all_workers_idle?
  timeout = Settings.sleep_delay + (rand * Settings.sleep_delay_stagger)
  readable, _, _ = IO.select(handles, nil, nil, timeout)
  if readable
    readable.each { |s| handle_read(s) }
  end
  Delayed::Worker.lifecycle.run_callbacks(:check_for_work, self) do
    check_for_work(forced_latency: forced_latency)
  end
  unlock_timed_out_prefetched_jobs
end

#unlock_all_prefetched_jobsObject



200
201
202
203
204
205
206
# File 'lib/delayed/work_queue/parent_process/server.rb', line 200

def unlock_all_prefetched_jobs
  @prefetched_jobs.each do |(_worker_config, jobs)|
    next if jobs.empty?
    Delayed::Job.unlock(jobs)
  end
  @prefetched_jobs = {}
end

#unlock_timed_out_prefetched_jobsObject



190
191
192
193
194
195
196
197
198
# File 'lib/delayed/work_queue/parent_process/server.rb', line 190

def unlock_timed_out_prefetched_jobs
  @prefetched_jobs.each do |(worker_config, jobs)|
    next if jobs.empty?
    if jobs.first.locked_at < Time.now.utc - Settings.parent_process[:prefetched_jobs_timeout]
      Delayed::Job.unlock(jobs)
      @prefetched_jobs[worker_config] = []
    end
  end
end