Class: Delayed::WorkQueue::ParentProcess::Server

Inherits:
Object
  • Object
show all
Defined in:
lib/delayed/work_queue/parent_process.rb

Defined Under Namespace

Classes: ClientState

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(listen_socket, parent_pid: nil) ⇒ Server

Returns a new instance of Server.



80
81
82
83
84
# File 'lib/delayed/work_queue/parent_process.rb', line 80

def initialize(listen_socket, parent_pid: nil)
  @listen_socket = listen_socket
  @parent_pid = parent_pid
  @clients = {}
end

Instance Attribute Details

#listen_socketObject (readonly)

Returns the value of attribute listen_socket.



78
79
80
# File 'lib/delayed/work_queue/parent_process.rb', line 78

def listen_socket
  @listen_socket
end

Instance Method Details

#all_workers_idle?Boolean

Returns:

  • (Boolean)


90
91
92
# File 'lib/delayed/work_queue/parent_process.rb', line 90

def all_workers_idle?
  !@clients.any? { |_, c| c.working }
end

#client_timeoutObject



172
173
174
# File 'lib/delayed/work_queue/parent_process.rb', line 172

def client_timeout
  Timeout.timeout(Settings.parent_process_client_timeout) { yield }
end

#connected_clientsObject



86
87
88
# File 'lib/delayed/work_queue/parent_process.rb', line 86

def connected_clients
  @clients.size
end

#exit?Boolean

Returns:

  • (Boolean)


164
165
166
# File 'lib/delayed/work_queue/parent_process.rb', line 164

def exit?
  parent_exited?
end

#handle_acceptObject

Any error on the listen socket other than WaitReadable will bubble up and terminate the work queue process, to be restarted by the parent daemon.



134
135
136
137
138
139
140
141
# File 'lib/delayed/work_queue/parent_process.rb', line 134

def handle_accept
  client, _addr = @listen_socket.accept_nonblock
  if client
    @clients[client] = ClientState.new(false)
  end
rescue IO::WaitReadable
  # ignore and just try accepting again next time through the loop
end

#handle_read(socket) ⇒ Object



124
125
126
127
128
129
130
# File 'lib/delayed/work_queue/parent_process.rb', line 124

def handle_read(socket)
  if socket == @listen_socket
    handle_accept
  else
    handle_request(socket)
  end
end

#handle_request(socket) ⇒ Object



143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
# File 'lib/delayed/work_queue/parent_process.rb', line 143

def handle_request(socket)
  # There is an assumption here that the client will never send a partial
  # request and then leave the socket open. Doing so would leave us hanging
  # here forever. This is only a reasonable assumption because we control
  # the client.
  request = client_timeout { Marshal.load(socket) }
  response = nil
  Delayed::Worker.lifecycle.run_callbacks(:work_queue_pop, self) do
    response = Delayed::Job.get_and_lock_next_available(*request)
    @clients[socket].working = !response.nil?
  end
  client_timeout { Marshal.dump(response, socket) }
rescue SystemCallError, IOError, Timeout::Error
  # this socket went away
  begin
    socket.close
  rescue IOError
  end
  @clients.delete(socket)
end

#parent_exited?Boolean

Returns:

  • (Boolean)


168
169
170
# File 'lib/delayed/work_queue/parent_process.rb', line 168

def parent_exited?
  @parent_pid && @parent_pid != Process.ppid
end

#runObject

run the server queue worker this method does not return, only exits or raises an exception



104
105
106
107
108
109
110
111
112
113
114
# File 'lib/delayed/work_queue/parent_process.rb', line 104

def run
  say "Starting work queue process"

  while !exit?
    run_once
  end

rescue => e
  say "WorkQueue Server died: #{e.inspect}"
  raise
end

#run_onceObject



116
117
118
119
120
121
122
# File 'lib/delayed/work_queue/parent_process.rb', line 116

def run_once
  handles = @clients.keys + [@listen_socket]
  readable, _, _ = IO.select(handles, nil, nil, 1)
  if readable
    readable.each { |s| handle_read(s) }
  end
end

#say(msg, level = :debug) ⇒ Object



94
95
96
97
98
99
100
# File 'lib/delayed/work_queue/parent_process.rb', line 94

def say(msg, level = :debug)
  if defined?(Rails.logger) && Rails.logger
    Rails.logger.send(level, "[#{Process.pid}]Q #{msg}")
  else
    puts(msg)
  end
end