Module: Angael::Worker

Includes:
ProcessHelper
Defined in:
lib/angael/worker.rb

Overview

Usage

include Angael::Worker
def work
  # Do something interesting, without raising an exception.
end

You can also add some optional behavior by defining the following methods:

#after_fork  - This is run once, immediately after the child process is forked
#before_exit - This is run once, immediately before the child process exits
#fork_child  - This actually does the forking. You can overwrite this method
               to do wrap the child process in a block. This is useful for
               exception handling. Be sure to actually fork or you may break
               something important.
#log         - If defined, this will be called at various points of interest
               with 1 String as the argument. Log levels are not supported.
#timeout     - Number of seconds to wait for the child process to exit after
               it is sent SIGINT. If you don't define this method, it waits
               60 seconds.

Defined Under Namespace

Classes: ChildProcessNotStoppedError

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from ProcessHelper

#exit_status, #send_signal

Instance Attribute Details

#pidObject (readonly)

Returns the value of attribute pid.



25
26
27
# File 'lib/angael/worker.rb', line 25

def pid
  @pid
end

Instance Method Details

#inspectObject



138
139
140
# File 'lib/angael/worker.rb', line 138

def inspect
  "#<JobWorker:#{object_id} @pid=#{pid}>"
end

#restart!Object

This only exists for the sake of testing. I need a way to stub the restart! but not the original start! Note: this method is not tested directly. Users of this library should not call this method or depend on its existence or behavior.



74
75
76
# File 'lib/angael/worker.rb', line 74

def restart!
  start!
end

#start!Object

Loops forever, taking jobs off the queue. SIGINT will stop it after allowing any jobs already taken from the queue to be processed.



29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
# File 'lib/angael/worker.rb', line 29

def start!
  @stopping = false

  @pid = fork_child do
    __info("Started")

    @pid = $$

    if respond_to?(:after_fork)
      __debug("Running after fork callback")
      after_fork
      __debug("Finished running after fork callback")
    end

    @interrupted = false
    trap("INT") do
      __info("SIGINT Received")
      @interrupted = true
    end
    trap("TERM") do
      __info("SIGTERM Received")
      @interrupted = true
    end

    loop do
      if @interrupted
        if respond_to?(:before_exit)
          __debug("Running before exit callback")
          before_exit
          __debug("Finished running before exit callback")
        end

        __info("Child process exiting gracefully")
        exit 0
      end
      work
    end
  end
end

#started?Boolean

Returns:

  • (Boolean)


127
128
129
# File 'lib/angael/worker.rb', line 127

def started?
  !!(pid && pid_running?)
end

#stop_with_waitObject

Keeps sending SIGINT until the child process exits. If #timeout seconds pass, then it sends 1 SIGKILL. If that also fails, it raises ChildProcessNotStoppedError.



99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
# File 'lib/angael/worker.rb', line 99

def stop_with_wait
  return false unless stop_without_wait

  __debug("Waiting for child process with pid #{pid} to stop.")

  counter = 0

  while pid_running? && counter < timeout
    sleep 1
    counter += 1
    __info("Sending SIGINT to child process with pid #{pid}. Attempt Count: #{counter}.")
    send_signal('INT', pid)
  end

  if pid_running?
    __warn("Child process with pid #{pid} did not stop within #{timeout} seconds of SIGINT. Sending SIGKILL to child process.")
    send_signal('KILL', pid)
    sleep 1
  end

  if pid_running?
    # SIGKILL didn't work.
    msg = "Unable to kill child process with PID: #{pid}"
    __error(msg)
    raise ChildProcessNotStoppedError, msg
  end
end

#stop_without_waitObject

Returns true if SIGINT was sent to the child process, even if the child process does not exists. Returns false if started? is true. Sets stopping? to false.



82
83
84
85
86
87
88
89
90
91
92
93
94
95
# File 'lib/angael/worker.rb', line 82

def stop_without_wait
  unless started?
    __warn("Tried to stop worker with PID #{pid} but it is not started")
    return false
  end

  # This informs the Manager (through #stopping?) that we intentionally
  # stopped the child process.
  @stopping = true

  __debug("Sending SIGINT to child process with pid #{pid}.")
  send_signal('INT', pid)
  true
end

#stopped?Boolean

Returns:

  • (Boolean)


130
131
132
# File 'lib/angael/worker.rb', line 130

def stopped?
  !started?
end

#stopping?Boolean

TODO: test this

Returns:

  • (Boolean)


134
135
136
# File 'lib/angael/worker.rb', line 134

def stopping?
  @stopping
end