Class: Resqued::ListenerProxy

Inherits:
Object
  • Object
show all
Includes:
Logging
Defined in:
lib/resqued/listener_proxy.rb

Overview

Controls a listener process from the master process.

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from Logging

build_logger, close_log, #log, log_file, log_file=, #log_to_stdout?, logger, logging_io, #reopen_logs

Constructor Details

#initialize(state) ⇒ ListenerProxy

Public.



13
14
15
# File 'lib/resqued/listener_proxy.rb', line 13

def initialize(state)
  @state = state
end

Instance Attribute Details

#stateObject (readonly)

Returns the value of attribute state.



17
18
19
# File 'lib/resqued/listener_proxy.rb', line 17

def state
  @state
end

Instance Method Details

#disposeObject

Public: wrap up all the things, this object is going home.



20
21
22
23
24
25
# File 'lib/resqued/listener_proxy.rb', line 20

def dispose
  if @state.master_socket
    @state.master_socket.close
    @state.master_socket = nil
  end
end

#kill(signal) ⇒ Object

Public: Stop the listener process.



58
59
60
61
# File 'lib/resqued/listener_proxy.rb', line 58

def kill(signal)
  log "kill -#{signal} #{pid}"
  Process.kill(signal.to_s, pid)
end

#pidObject

Public: The pid of the running listener process.



33
34
35
# File 'lib/resqued/listener_proxy.rb', line 33

def pid
  @state.pid
end

#read_pipeObject

Public: An IO to select on to check if there is incoming data available.



28
29
30
# File 'lib/resqued/listener_proxy.rb', line 28

def read_pipe
  @state.master_socket
end

#read_worker_status(options) ⇒ Object

Public: Check for updates on running worker information.



74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
# File 'lib/resqued/listener_proxy.rb', line 74

def read_worker_status(options)
  on_activity = options[:on_activity]
  until @state.master_socket.nil?
    IO.select([@state.master_socket], nil, nil, 0) or return
    case line = @state.master_socket.readline
    when /^\+(\d+),(.*)$/
      worker_pids[$1] = $2
      on_activity&.worker_started($1)
    when /^-(\d+)$/
      worker_pids.delete($1)
      on_activity&.worker_finished($1)
    when /^RUNNING/
      on_activity&.listener_running(self)
    when ""
      break
    else
      log "Malformed data from listener: #{line.inspect}"
    end
  end
rescue EOFError, Errno::ECONNRESET
  @state.master_socket.close
  @state.master_socket = nil
end

#runObject

Public: Start the listener process.



38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
# File 'lib/resqued/listener_proxy.rb', line 38

def run
  return if pid

  listener_socket, master_socket = UNIXSocket.pair
  if @state.pid = fork
    # master
    listener_socket.close
    master_socket.close_on_exec = true
    log "Started listener #{@state.pid}"
    @state.master_socket = master_socket
  else
    # listener
    master_socket.close
    Master::TRAPS.each { |signal| trap(signal, "DEFAULT") rescue nil }
    Listener.new(@state.options.merge(socket: listener_socket)).exec
    exit
  end
end

#running_workersObject

Public: Get the list of workers running from this listener.



64
65
66
# File 'lib/resqued/listener_proxy.rb', line 64

def running_workers
  worker_pids.map { |pid, queue_key| { pid: pid, queue_key: queue_key } }
end

#worker_finished(pid) ⇒ Object

Public: Tell the listener process that a worker finished.



99
100
101
102
103
104
105
106
107
108
109
# File 'lib/resqued/listener_proxy.rb', line 99

def worker_finished(pid)
  return if @state.master_socket.nil?

  @state.master_socket.write_nonblock("#{pid}\n")
rescue IO::WaitWritable
  log "Couldn't tell #{@state.pid} that #{pid} exited!"
  # Ignore it, maybe the next time it'll work.
rescue Errno::EPIPE
  @state.master_socket.close
  @state.master_socket = nil
end

#worker_pidsObject

Private: Map worker pids to queue names



69
70
71
# File 'lib/resqued/listener_proxy.rb', line 69

def worker_pids
  @state.worker_pids ||= {}
end