Class: Oni::Daemon
Overview
The Daemon class takes care of retrieving work to be processed, scheduling it and dispatching it to a mapper and worker. In essence a Daemon instance can be seen as a controller when compared with typical MVC frameworks.
This daemon starts a number of threads (5 by default) that will each perform work on their own using the corresponding mapper and worker class.
Direct Known Subclasses
Constant Summary collapse
- DEFAULT_WORKER_AMOUNT =
The default amount of worker to start.
1- DEFAULT_THREAD_AMOUNT =
The default amount of threads to start.
5- DEFAULT_WORKER_TIMEOUT =
The default amount of threads to start.
nil
Instance Attribute Summary collapse
-
#daemon_workers ⇒ Object
readonly
Returns the value of attribute daemon_workers.
- #workers ⇒ Array<Thread> readonly
Instance Method Summary collapse
-
#complete(message, output) ⇒ Object
Called when a job has been completed, by default this method is a noop.
-
#create_mapper ⇒ Oni::Mapper
Creates a new mapper and passes it a set of arguments as defined in #mapper_arguments.
-
#error(error) ⇒ Object
Called whenever an error is raised in the daemon, mapper or worker.
-
#initialize ⇒ Daemon
constructor
Creates a new instance of the class and calls
#after_initializeif it is defined. -
#process(message) ⇒ Object
Processes the given message.
-
#receive ⇒ Object
Receives a message, by default this method raises an error.
-
#run_thread ⇒ Object
The main code to execute in individual threads.
-
#run_worker(message) ⇒ Mixed
Maps the input, runs the worker and then maps the output into something that the daemon can understand.
-
#spawn_thread ⇒ Thread
Spawns a new thread that waits for daemon input.
-
#spawn_worker(i = nil) ⇒ Thread
Spawns a new thread that waits for daemon input.
-
#start ⇒ Object
Starts the daemon and waits for all threads to finish execution.
-
#stop ⇒ Object
Terminates all the threads and clears up the list.
-
#threads ⇒ Fixnum
Returns the amount of threads to use.
-
#worker_timeout ⇒ Fixnum
Returns the amount of threads to use.
Methods included from Configurable
included, #option, #require_option!
Constructor Details
#initialize ⇒ Daemon
Creates a new instance of the class and calls #after_initialize if it
is defined.
43 44 45 46 47 |
# File 'lib/oni/daemon.rb', line 43 def initialize @daemon_workers = {} after_initialize if respond_to?(:after_initialize) end |
Instance Attribute Details
#daemon_workers ⇒ Object (readonly)
Returns the value of attribute daemon_workers.
16 17 18 |
# File 'lib/oni/daemon.rb', line 16 def daemon_workers @daemon_workers end |
#workers ⇒ Array<Thread> (readonly)
13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 |
# File 'lib/oni/daemon.rb', line 13 class Daemon include Configurable attr_reader :daemon_workers ## # The default amount of worker to start. # # @return [Fixnum] # DEFAULT_WORKER_AMOUNT = 1 ## # The default amount of threads to start. # # @return [Fixnum] # DEFAULT_THREAD_AMOUNT = 5 ## # The default amount of threads to start. # # @return [Fixnum] # DEFAULT_WORKER_TIMEOUT = nil ## # Creates a new instance of the class and calls `#after_initialize` if it # is defined. # def initialize @daemon_workers = {} after_initialize if respond_to?(:after_initialize) end ## # Starts the daemon and waits for all threads to finish execution. This # method is blocking since it will wait for all threads to finish. # # If the current class has a `before_start` method defined it's called # before starting the daemon. # def start before_start if respond_to? :before_start return run_thread if threads <= 1 return spawn_worker if workers <= 1 threads = Array.new workers do |i| Thread.new do Process.wait fork{ spawn_worker i+1 } while true end end sleep 3 after_start if respond_to? :after_start threads.each(&:join) rescue => error error(error) end ## # Terminates all the threads and clears up the list. Note that calling this # method acts much like sending a SIGKILL signal to a process: threads will # be shut down *immediately*. # def stop daemon_workers.each do |pid, worker_threads| worker_threads.each(&:kill) worker_threads.clear end end def workers option :workers, DEFAULT_WORKER_AMOUNT end ## # Returns the amount of threads to use. # # @return [Fixnum] # def threads option :threads, DEFAULT_THREAD_AMOUNT end ## # Returns the amount of threads to use. # # @return [Fixnum] # def worker_timeout option :worker_timeout, DEFAULT_WORKER_TIMEOUT end ## # Processes the given message. Upon completion the `#complete` method is # called and passed the resulting output. # # @param [Mixed] message # def process() output = run_worker() complete(, output) end ## # Maps the input, runs the worker and then maps the output into something # that the daemon can understand. # # @param [Mixed] message # @return [Mixed] # def run_worker() mapper = create_mapper input = mapper.map_input() worker = option(:worker).new(*input) output = Timeout.timeout worker_timeout do worker.process end mapper.map_output output end ## # Receives a message, by default this method raises an error. # # @raise [NotImplementedError] # def receive raise NotImplementedError, 'You must manually implement #receive' end ## # Called when a job has been completed, by default this method is a noop. # This method is passed 2 arguments: # # 1. The raw input message. # 2. The output of the worker (remapped by the mapper). # # @param [Mixed] message The raw input message (e.g. an AWS SQS message) # @param [Mixed] output The output of the worker. # def complete(, output) end ## # Called whenever an error is raised in the daemon, mapper or worker. By # default this method just re-raises the error. # # @param [StandardError] error # def error(error) raise error end ## # Creates a new mapper and passes it a set of arguments as defined in # {Oni::Daemon#mapper_arguments}. # # @return [Oni::Mapper] # def create_mapper unless option(:mapper) raise ArgumentError, 'No mapper has been set in the `:mapper` option' end return option(:mapper).new end ## # Spawns a new thread that waits for daemon input. # # @return [Thread] # def spawn_worker i = nil Process.setproctitle "#{$0}: worker #{i}" if i daemon_workers[Process.pid] = Array.new threads do spawn_thread end.each(&:join) end ## # Spawns a new thread that waits for daemon input. # # @return [Thread] # def spawn_thread thread = Thread.new { run_thread } thread.abort_on_exception = true return thread end ## # The main code to execute in individual threads. # # If an error occurs in the receive method or processing a job the error # handler is executed and the process is retried. It's the responsibility # of the `error` method to determine if the process should fail only once # (and fail hard) or if it should continue running. # def run_thread receive do || process end rescue => error error(error) retry end end |
Instance Method Details
#complete(message, output) ⇒ Object
Called when a job has been completed, by default this method is a noop. This method is passed 2 arguments:
- The raw input message.
- The output of the worker (remapped by the mapper).
159 160 |
# File 'lib/oni/daemon.rb', line 159 def complete(, output) end |
#create_mapper ⇒ Oni::Mapper
Creates a new mapper and passes it a set of arguments as defined in #mapper_arguments.
178 179 180 181 182 183 184 |
# File 'lib/oni/daemon.rb', line 178 def create_mapper unless option(:mapper) raise ArgumentError, 'No mapper has been set in the `:mapper` option' end return option(:mapper).new end |
#error(error) ⇒ Object
Called whenever an error is raised in the daemon, mapper or worker. By default this method just re-raises the error.
168 169 170 |
# File 'lib/oni/daemon.rb', line 168 def error(error) raise error end |
#process(message) ⇒ Object
Processes the given message. Upon completion the #complete method is
called and passed the resulting output.
116 117 118 119 120 |
# File 'lib/oni/daemon.rb', line 116 def process() output = run_worker() complete(, output) end |
#receive ⇒ Object
Receives a message, by default this method raises an error.
145 146 147 |
# File 'lib/oni/daemon.rb', line 145 def receive raise NotImplementedError, 'You must manually implement #receive' end |
#run_thread ⇒ Object
The main code to execute in individual threads.
If an error occurs in the receive method or processing a job the error
handler is executed and the process is retried. It's the responsibility
of the error method to determine if the process should fail only once
(and fail hard) or if it should continue running.
220 221 222 223 224 225 226 227 228 |
# File 'lib/oni/daemon.rb', line 220 def run_thread receive do || process end rescue => error error(error) retry end |
#run_worker(message) ⇒ Mixed
Maps the input, runs the worker and then maps the output into something that the daemon can understand.
129 130 131 132 133 134 135 136 137 138 |
# File 'lib/oni/daemon.rb', line 129 def run_worker() mapper = create_mapper input = mapper.map_input() worker = option(:worker).new(*input) output = Timeout.timeout worker_timeout do worker.process end mapper.map_output output end |
#spawn_thread ⇒ Thread
Spawns a new thread that waits for daemon input.
204 205 206 207 208 209 210 |
# File 'lib/oni/daemon.rb', line 204 def spawn_thread thread = Thread.new { run_thread } thread.abort_on_exception = true return thread end |
#spawn_worker(i = nil) ⇒ Thread
Spawns a new thread that waits for daemon input.
191 192 193 194 195 196 197 |
# File 'lib/oni/daemon.rb', line 191 def spawn_worker i = nil Process.setproctitle "#{$0}: worker #{i}" if i daemon_workers[Process.pid] = Array.new threads do spawn_thread end.each(&:join) end |
#start ⇒ Object
Starts the daemon and waits for all threads to finish execution. This method is blocking since it will wait for all threads to finish.
If the current class has a before_start method defined it's called
before starting the daemon.
56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 |
# File 'lib/oni/daemon.rb', line 56 def start before_start if respond_to? :before_start return run_thread if threads <= 1 return spawn_worker if workers <= 1 threads = Array.new workers do |i| Thread.new do Process.wait fork{ spawn_worker i+1 } while true end end sleep 3 after_start if respond_to? :after_start threads.each(&:join) rescue => error error(error) end |
#stop ⇒ Object
Terminates all the threads and clears up the list. Note that calling this method acts much like sending a SIGKILL signal to a process: threads will be shut down immediately.
81 82 83 84 85 86 |
# File 'lib/oni/daemon.rb', line 81 def stop daemon_workers.each do |pid, worker_threads| worker_threads.each(&:kill) worker_threads.clear end end |
#threads ⇒ Fixnum
Returns the amount of threads to use.
97 98 99 |
# File 'lib/oni/daemon.rb', line 97 def threads option :threads, DEFAULT_THREAD_AMOUNT end |
#worker_timeout ⇒ Fixnum
Returns the amount of threads to use.
106 107 108 |
# File 'lib/oni/daemon.rb', line 106 def worker_timeout option :worker_timeout, DEFAULT_WORKER_TIMEOUT end |