- Defined in:
ParentProcess is a WorkQueue implementation that spawns a separate worker process for querying the queue. Each Worker child process sends requests to the ParentProcess via IPC, and receives responses. This centralized queue querying cuts down on db queries and lock contention, and allows the possibility for other centralized logic such as notifications when all workers are idle.
The IPC implementation uses Unix stream sockets and Ruby's built-in Marshal functionality. The ParentProcess creates a Unix socket on the filesystem in the tmp directory, so that if a worker process dies and is restarted it can reconnect to the socket.
While Unix and IP sockets are API compatible, we take a lot of shortcuts because we know it's just a local Unix socket. If we ever wanted to swap this out for a TCP/IP socket and have the WorkQueue running on another host, we'd want to be a lot more robust about partial reads/writes and timeouts.
Defined Under Namespace
Instance Attribute Summary collapse
#server_address ⇒ Object
Returns the value of attribute server_address.
Instance Method Summary collapse
- #client ⇒ Object
#initialize(config = Settings.parent_process) ⇒ ParentProcess
A new instance of ParentProcess.
- #server(parent_pid: nil) ⇒ Object
#initialize(config = Settings.parent_process) ⇒
Returns a new instance of ParentProcess.
37 38 39 40
# File 'lib/delayed/work_queue/parent_process.rb', line 37 def initialize(config = .parent_process) @config = config @server_address = generate_socket_path(config['server_address']) end
Instance Attribute Details
#server_address ⇒ Object (readonly)
Returns the value of attribute server_address
32 33 34
# File 'lib/delayed/work_queue/parent_process.rb', line 32 def server_address @server_address end
Instance Method Details
#client ⇒ Object
49 50 51
# File 'lib/delayed/work_queue/parent_process.rb', line 49 def client .(Addrinfo.unix(@server_address), config: @config) end
#server(parent_pid: nil) ⇒ Object
42 43 44 45 46 47
# File 'lib/delayed/work_queue/parent_process.rb', line 42 def server(parent_pid: nil) # The unix_server_socket method takes care of cleaning up any existing # socket for us if the work queue process dies and is restarted. listen_socket = Socket.unix_server_socket(@server_address) .(listen_socket, parent_pid: parent_pid, config: @config) end