Class: Oni::Daemon
Overview
The Daemon class takes care of retrieving work to be processed, scheduling it and dispatching it to a mapper and worker. In essence a Daemon instance can be seen as a controller when compared with typical MVC frameworks.
This daemon starts a number of threads (5 by default) that will each perform work on their own using the corresponding mapper and worker class.
Direct Known Subclasses
Constant Summary collapse
- DEFAULT_WORKER_AMOUNT =
The default amount of worker to start.
1
- DEFAULT_THREAD_AMOUNT =
The default amount of threads to start.
5
- DEFAULT_WORKER_TIMEOUT =
The default amount of threads to start.
nil
Instance Attribute Summary collapse
-
#daemon_workers ⇒ Object
readonly
Returns the value of attribute daemon_workers.
- #workers ⇒ Array<Thread> readonly
Instance Method Summary collapse
-
#complete(message, output) ⇒ Object
Called when a job has been completed, by default this method is a noop.
-
#create_mapper ⇒ Oni::Mapper
Creates a new mapper and passes it a set of arguments as defined in #mapper_arguments.
-
#error(error) ⇒ Object
Called whenever an error is raised in the daemon, mapper or worker.
-
#initialize ⇒ Daemon
constructor
Creates a new instance of the class and calls
#after_initialize
if it is defined. -
#process(message) ⇒ Object
Processes the given message.
-
#receive ⇒ Object
Receives a message, by default this method raises an error.
-
#run_thread ⇒ Object
The main code to execute in individual threads.
-
#run_worker(message) ⇒ Mixed
Maps the input, runs the worker and then maps the output into something that the daemon can understand.
-
#spawn_thread ⇒ Thread
Spawns a new thread that waits for daemon input.
-
#spawn_worker(i = nil, &block) ⇒ Thread
Spawns a new thread that waits for daemon input.
- #standard_worker ⇒ Object
-
#start ⇒ Object
Starts the daemon and waits for all threads to finish execution.
-
#stop ⇒ Object
Terminates all the threads and clears up the list.
-
#threads ⇒ Fixnum
Returns the amount of threads to use.
-
#worker_timeout ⇒ Fixnum
Returns the amount of threads to use.
Methods included from Configurable
included, #option, #require_option!
Constructor Details
#initialize ⇒ Daemon
Creates a new instance of the class and calls #after_initialize
if it
is defined.
43 44 45 46 47 |
# File 'lib/oni/daemon.rb', line 43 def initialize @daemon_workers = Hash.new{ |h, k| h[k] = [] } after_initialize if respond_to?(:after_initialize) end |
Instance Attribute Details
#daemon_workers ⇒ Object (readonly)
Returns the value of attribute daemon_workers.
16 17 18 |
# File 'lib/oni/daemon.rb', line 16 def daemon_workers @daemon_workers end |
#workers ⇒ Array<Thread> (readonly)
13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 |
# File 'lib/oni/daemon.rb', line 13 class Daemon include Configurable attr_reader :daemon_workers ## # The default amount of worker to start. # # @return [Fixnum] # DEFAULT_WORKER_AMOUNT = 1 ## # The default amount of threads to start. # # @return [Fixnum] # DEFAULT_THREAD_AMOUNT = 5 ## # The default amount of threads to start. # # @return [Fixnum] # DEFAULT_WORKER_TIMEOUT = nil ## # Creates a new instance of the class and calls `#after_initialize` if it # is defined. # def initialize @daemon_workers = Hash.new{ |h, k| h[k] = [] } after_initialize if respond_to?(:after_initialize) end ## # Starts the daemon and waits for all threads to finish execution. This # method is blocking since it will wait for all threads to finish. # # If the current class has a `before_start` method defined it's called # before starting the daemon. # def start before_start if respond_to? :before_start wthreads = if threads <= 1 then [run_thread] elsif workers <= 1 then standard_worker else wthreads = Array.new(workers).map{ |i| spawn_worker i } end after_start if respond_to? :after_start %i[INT TERM].each{ |sig| trap(sig){ stop } } wthreads.each(&:join) if workers > 1 rescue => error error(error) end ## # Terminates all the threads and clears up the list. Note that calling this # method acts much like sending a SIGKILL signal to a process: threads will # be shut down *immediately*. # def stop daemon_workers.each do |pid, worker_threads| worker_threads.each(&:kill) worker_threads.clear end end def workers option :workers, DEFAULT_WORKER_AMOUNT end ## # Returns the amount of threads to use. # # @return [Fixnum] # def threads option :threads, DEFAULT_THREAD_AMOUNT end ## # Returns the amount of threads to use. # # @return [Fixnum] # def worker_timeout option :worker_timeout, DEFAULT_WORKER_TIMEOUT end ## # Processes the given message. Upon completion the `#complete` method is # called and passed the resulting output. # # @param [Mixed] message # def process() output = run_worker() complete(, output) end ## # Maps the input, runs the worker and then maps the output into something # that the daemon can understand. # # @param [Mixed] message # @return [Mixed] # def run_worker() mapper = create_mapper input = mapper.map_input() worker = option(:worker).new(*input) output = Timeout.timeout worker_timeout do worker.process end mapper.map_output output end ## # Receives a message, by default this method raises an error. # # @raise [NotImplementedError] # def receive raise NotImplementedError, 'You must manually implement #receive' end ## # Called when a job has been completed, by default this method is a noop. # This method is passed 2 arguments: # # 1. The raw input message. # 2. The output of the worker (remapped by the mapper). # # @param [Mixed] message The raw input message (e.g. an AWS SQS message) # @param [Mixed] output The output of the worker. # def complete(, output) end ## # Called whenever an error is raised in the daemon, mapper or worker. By # default this method just re-raises the error. # # @param [StandardError] error # def error(error) raise error end ## # Creates a new mapper and passes it a set of arguments as defined in # {Oni::Daemon#mapper_arguments}. # # @return [Oni::Mapper] # def create_mapper unless option(:mapper) raise ArgumentError, 'No mapper has been set in the `:mapper` option' end return option(:mapper).new end ## # Spawns a new thread that waits for daemon input. # # @return [Thread] # def spawn_worker i = nil, &block Thread.new do loop do # keep restarting for OOM and other cases pid = fork do Process.setproctitle "#{$0}: worker #{i}" if i if block then yield else standard_worker end end Process.wait pid end end end def standard_worker Array.new(threads).map do spawn_thread.tap{ |t| daemon_workers[Process.pid] << t } end.each(&:join) end ## # Spawns a new thread that waits for daemon input. # # @return [Thread] # def spawn_thread Thread.new{ run_thread }.tap do |t| t.abort_on_exception = true end end ## # The main code to execute in individual threads. # # If an error occurs in the receive method or processing a job the error # handler is executed and the process is retried. It's the responsibility # of the `error` method to determine if the process should fail only once # (and fail hard) or if it should continue running. # def run_thread receive do || process end rescue => error error(error) retry end end |
Instance Method Details
#complete(message, output) ⇒ Object
Called when a job has been completed, by default this method is a noop. This method is passed 2 arguments:
- The raw input message.
- The output of the worker (remapped by the mapper).
155 156 |
# File 'lib/oni/daemon.rb', line 155 def complete(, output) end |
#create_mapper ⇒ Oni::Mapper
Creates a new mapper and passes it a set of arguments as defined in #mapper_arguments.
174 175 176 177 178 179 180 |
# File 'lib/oni/daemon.rb', line 174 def create_mapper unless option(:mapper) raise ArgumentError, 'No mapper has been set in the `:mapper` option' end return option(:mapper).new end |
#error(error) ⇒ Object
Called whenever an error is raised in the daemon, mapper or worker. By default this method just re-raises the error.
164 165 166 |
# File 'lib/oni/daemon.rb', line 164 def error(error) raise error end |
#process(message) ⇒ Object
Processes the given message. Upon completion the #complete
method is
called and passed the resulting output.
112 113 114 115 116 |
# File 'lib/oni/daemon.rb', line 112 def process() output = run_worker() complete(, output) end |
#receive ⇒ Object
Receives a message, by default this method raises an error.
141 142 143 |
# File 'lib/oni/daemon.rb', line 141 def receive raise NotImplementedError, 'You must manually implement #receive' end |
#run_thread ⇒ Object
The main code to execute in individual threads.
If an error occurs in the receive method or processing a job the error
handler is executed and the process is retried. It's the responsibility
of the error
method to determine if the process should fail only once
(and fail hard) or if it should continue running.
225 226 227 228 229 230 231 232 233 |
# File 'lib/oni/daemon.rb', line 225 def run_thread receive do || process end rescue => error error(error) retry end |
#run_worker(message) ⇒ Mixed
Maps the input, runs the worker and then maps the output into something that the daemon can understand.
125 126 127 128 129 130 131 132 133 134 |
# File 'lib/oni/daemon.rb', line 125 def run_worker() mapper = create_mapper input = mapper.map_input() worker = option(:worker).new(*input) output = Timeout.timeout worker_timeout do worker.process end mapper.map_output output end |
#spawn_thread ⇒ Thread
Spawns a new thread that waits for daemon input.
211 212 213 214 215 |
# File 'lib/oni/daemon.rb', line 211 def spawn_thread Thread.new{ run_thread }.tap do |t| t.abort_on_exception = true end end |
#spawn_worker(i = nil, &block) ⇒ Thread
Spawns a new thread that waits for daemon input.
187 188 189 190 191 192 193 194 195 196 197 198 |
# File 'lib/oni/daemon.rb', line 187 def spawn_worker i = nil, &block Thread.new do loop do # keep restarting for OOM and other cases pid = fork do Process.setproctitle "#{$0}: worker #{i}" if i if block then yield else standard_worker end end Process.wait pid end end end |
#standard_worker ⇒ Object
200 201 202 203 204 |
# File 'lib/oni/daemon.rb', line 200 def standard_worker Array.new(threads).map do spawn_thread.tap{ |t| daemon_workers[Process.pid] << t } end.each(&:join) end |
#start ⇒ Object
Starts the daemon and waits for all threads to finish execution. This method is blocking since it will wait for all threads to finish.
If the current class has a before_start
method defined it's called
before starting the daemon.
56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 |
# File 'lib/oni/daemon.rb', line 56 def start before_start if respond_to? :before_start wthreads = if threads <= 1 then [run_thread] elsif workers <= 1 then standard_worker else wthreads = Array.new(workers).map{ |i| spawn_worker i } end after_start if respond_to? :after_start %i[INT TERM].each{ |sig| trap(sig){ stop } } wthreads.each(&:join) if workers > 1 rescue => error error(error) end |
#stop ⇒ Object
Terminates all the threads and clears up the list. Note that calling this method acts much like sending a SIGKILL signal to a process: threads will be shut down immediately.
77 78 79 80 81 82 |
# File 'lib/oni/daemon.rb', line 77 def stop daemon_workers.each do |pid, worker_threads| worker_threads.each(&:kill) worker_threads.clear end end |
#threads ⇒ Fixnum
Returns the amount of threads to use.
93 94 95 |
# File 'lib/oni/daemon.rb', line 93 def threads option :threads, DEFAULT_THREAD_AMOUNT end |
#worker_timeout ⇒ Fixnum
Returns the amount of threads to use.
102 103 104 |
# File 'lib/oni/daemon.rb', line 102 def worker_timeout option :worker_timeout, DEFAULT_WORKER_TIMEOUT end |