Class: Delayed::WorkQueue::ParentProcess::Server
- Inherits:
-
Object
- Object
- Delayed::WorkQueue::ParentProcess::Server
- Defined in:
- lib/delayed/work_queue/parent_process.rb
Defined Under Namespace
Classes: ClientState
Instance Attribute Summary collapse
-
#listen_socket ⇒ Object
readonly
Returns the value of attribute listen_socket.
Instance Method Summary collapse
- #all_workers_idle? ⇒ Boolean
- #client_timeout ⇒ Object
- #connected_clients ⇒ Object
- #exit? ⇒ Boolean
-
#handle_accept ⇒ Object
Any error on the listen socket other than WaitReadable will bubble up and terminate the work queue process, to be restarted by the parent daemon.
- #handle_read(socket) ⇒ Object
- #handle_request(socket) ⇒ Object
-
#initialize(listen_socket, parent_pid: nil) ⇒ Server
constructor
A new instance of Server.
- #parent_exited? ⇒ Boolean
-
#run ⇒ Object
run the server queue worker this method does not return, only exits or raises an exception.
- #run_once ⇒ Object
- #say(msg, level = :debug) ⇒ Object
Constructor Details
#initialize(listen_socket, parent_pid: nil) ⇒ Server
Returns a new instance of Server.
80 81 82 83 84 |
# File 'lib/delayed/work_queue/parent_process.rb', line 80 def initialize(listen_socket, parent_pid: nil) @listen_socket = listen_socket @parent_pid = parent_pid @clients = {} end |
Instance Attribute Details
#listen_socket ⇒ Object (readonly)
Returns the value of attribute listen_socket.
78 79 80 |
# File 'lib/delayed/work_queue/parent_process.rb', line 78 def listen_socket @listen_socket end |
Instance Method Details
#all_workers_idle? ⇒ Boolean
90 91 92 |
# File 'lib/delayed/work_queue/parent_process.rb', line 90 def all_workers_idle? !@clients.any? { |_, c| c.working } end |
#client_timeout ⇒ Object
172 173 174 |
# File 'lib/delayed/work_queue/parent_process.rb', line 172 def client_timeout Timeout.timeout(Settings.parent_process_client_timeout) { yield } end |
#connected_clients ⇒ Object
86 87 88 |
# File 'lib/delayed/work_queue/parent_process.rb', line 86 def connected_clients @clients.size end |
#exit? ⇒ Boolean
164 165 166 |
# File 'lib/delayed/work_queue/parent_process.rb', line 164 def exit? parent_exited? end |
#handle_accept ⇒ Object
Any error on the listen socket other than WaitReadable will bubble up and terminate the work queue process, to be restarted by the parent daemon.
134 135 136 137 138 139 140 141 |
# File 'lib/delayed/work_queue/parent_process.rb', line 134 def handle_accept client, _addr = @listen_socket.accept_nonblock if client @clients[client] = ClientState.new(false) end rescue IO::WaitReadable # ignore and just try accepting again next time through the loop end |
#handle_read(socket) ⇒ Object
124 125 126 127 128 129 130 |
# File 'lib/delayed/work_queue/parent_process.rb', line 124 def handle_read(socket) if socket == @listen_socket handle_accept else handle_request(socket) end end |
#handle_request(socket) ⇒ Object
143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 |
# File 'lib/delayed/work_queue/parent_process.rb', line 143 def handle_request(socket) # There is an assumption here that the client will never send a partial # request and then leave the socket open. Doing so would leave us hanging # here forever. This is only a reasonable assumption because we control # the client. request = client_timeout { Marshal.load(socket) } response = nil Delayed::Worker.lifecycle.run_callbacks(:work_queue_pop, self) do response = Delayed::Job.get_and_lock_next_available(*request) @clients[socket].working = !response.nil? end client_timeout { Marshal.dump(response, socket) } rescue SystemCallError, IOError, Timeout::Error # this socket went away begin socket.close rescue IOError end @clients.delete(socket) end |
#parent_exited? ⇒ Boolean
168 169 170 |
# File 'lib/delayed/work_queue/parent_process.rb', line 168 def parent_exited? @parent_pid && @parent_pid != Process.ppid end |
#run ⇒ Object
run the server queue worker this method does not return, only exits or raises an exception
104 105 106 107 108 109 110 111 112 113 114 |
# File 'lib/delayed/work_queue/parent_process.rb', line 104 def run say "Starting work queue process" while !exit? run_once end rescue => e say "WorkQueue Server died: #{e.inspect}" raise end |
#run_once ⇒ Object
116 117 118 119 120 121 122 |
# File 'lib/delayed/work_queue/parent_process.rb', line 116 def run_once handles = @clients.keys + [@listen_socket] readable, _, _ = IO.select(handles, nil, nil, 1) if readable readable.each { |s| handle_read(s) } end end |
#say(msg, level = :debug) ⇒ Object
94 95 96 97 98 99 100 |
# File 'lib/delayed/work_queue/parent_process.rb', line 94 def say(msg, level = :debug) if defined?(Rails.logger) && Rails.logger Rails.logger.send(level, "[#{Process.pid}]Q #{msg}") else puts(msg) end end |