Class: Delayed::WorkQueue::ParentProcess

Inherits:
Object
  • Object
show all
Defined in:
lib/delayed/work_queue/parent_process.rb,
lib/delayed/work_queue/parent_process/client.rb,
lib/delayed/work_queue/parent_process/server.rb

Overview

ParentProcess is a WorkQueue implementation that spawns a separate worker process for querying the queue. Each Worker child process sends requests to the ParentProcess via IPC, and receives responses. This centralized queue querying cuts down on db queries and lock contention, and allows the possibility for other centralized logic such as notifications when all workers are idle.

The IPC implementation uses Unix stream sockets and Ruby’s built-in Marshal functionality. The ParentProcess creates a Unix socket on the filesystem in the tmp directory, so that if a worker process dies and is restarted it can reconnect to the socket.

While Unix and IP sockets are API compatible, we take a lot of shortcuts because we know it’s just a local Unix socket. If we ever wanted to swap this out for a TCP/IP socket and have the WorkQueue running on another host, we’d want to be a lot more robust about partial reads/writes and timeouts.

Defined Under Namespace

Classes: Client, ProtocolError, Server

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(config = Settings.parent_process) ⇒ ParentProcess

Returns a new instance of ParentProcess.



37
38
39
40
# File 'lib/delayed/work_queue/parent_process.rb', line 37

def initialize(config = Settings.parent_process)
  @config = config
  @server_address = generate_socket_path(config["server_address"])
end

Instance Attribute Details

#server_addressObject (readonly)

Returns the value of attribute server_address.



32
33
34
# File 'lib/delayed/work_queue/parent_process.rb', line 32

def server_address
  @server_address
end

Instance Method Details

#clientObject



49
50
51
# File 'lib/delayed/work_queue/parent_process.rb', line 49

def client
  Client.new(Addrinfo.unix(@server_address), config: @config)
end

#server(parent_pid: nil) ⇒ Object



42
43
44
45
46
47
# File 'lib/delayed/work_queue/parent_process.rb', line 42

def server(parent_pid: nil)
  # The unix_server_socket method takes care of cleaning up any existing
  # socket for us if the work queue process dies and is restarted.
  listen_socket = Socket.unix_server_socket(@server_address)
  Server.new(listen_socket, parent_pid: parent_pid, config: @config)
end