Class: Refinery::Daemon
- Inherits:
-
Object
- Object
- Refinery::Daemon
- Includes:
- Configurable, Loggable, Utilities
- Defined in:
- lib/refinery/daemon.rb
Overview
A daemon provides a thread to run workers in.
Constant Summary collapse
- RUNNING =
'running'- STOPPED =
'stopped'
Instance Attribute Summary collapse
-
#done_queue ⇒ Object
readonly
The queue for outgoing messages once they’ve been processed.
-
#error_queue ⇒ Object
readonly
The queue for error messages.
-
#name ⇒ Object
readonly
The name of the daemon.
-
#thread ⇒ Object
readonly
The daemon’s thread.
-
#waiting_queue ⇒ Object
readonly
The queue for incoming messages to process.
Instance Method Summary collapse
-
#initialize(server, name, waiting_queue, error_queue, done_queue) ⇒ Daemon
constructor
Initialize the daemon.
-
#running? ⇒ Boolean
Return true if the daemon state is running.
-
#state=(state) ⇒ Object
Set the daemon state.
-
#stop ⇒ Object
Stop the daemon.
-
#workers ⇒ Object
A hash of worker classes.
Methods included from Utilities
#camelize, #decode_message, #encode_message, #host_info
Methods included from Configurable
Methods included from Loggable
Constructor Details
#initialize(server, name, waiting_queue, error_queue, done_queue) ⇒ Daemon
Initialize the daemon.
-
server: The server instance -
name: The processor name -
waiting_queue: The waiting queue that provides messages to be processed -
error_queue: The queue where errors are posted. -
done_queue: The queue for messages that have been processed.
50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 |
# File 'lib/refinery/daemon.rb', line 50 def initialize(server, name, waiting_queue, error_queue, done_queue) Refinery::Server.logger.debug "Starting daemon" @server = server @name = name @waiting_queue = waiting_queue @error_queue = error_queue @done_queue = done_queue @thread = Thread.new(self) do |daemon| logger.debug "Running daemon thread" while(running?) begin while ( = waiting_queue.receive) worker = load_worker_class(name).new(self) begin result, run_time = worker.run((.body)) if result = { 'host_info' => host_info, 'original' => .body, 'run_time' => run_time } logger.debug "Sending 'done' message to #{done_queue.name}" done_queue.(()) logger.debug "Deleting message from queue" .delete() end rescue Exception => e = { 'error' => { 'message' => e., 'class' => e.class.name }, 'host_info' => host_info, 'original' => .body } error_queue.(()) .delete() end end sleep(1) rescue Exception => e logger.error "An error occurred while receiving from the waiting queue: #{e.message}" end end logger.debug "Exiting daemon thread" end end |
Instance Attribute Details
#done_queue ⇒ Object (readonly)
The queue for outgoing messages once they’ve been processed
18 19 20 |
# File 'lib/refinery/daemon.rb', line 18 def done_queue @done_queue end |
#error_queue ⇒ Object (readonly)
The queue for error messages
20 21 22 |
# File 'lib/refinery/daemon.rb', line 20 def error_queue @error_queue end |
#name ⇒ Object (readonly)
The name of the daemon
14 15 16 |
# File 'lib/refinery/daemon.rb', line 14 def name @name end |
#thread ⇒ Object (readonly)
The daemon’s thread
12 13 14 |
# File 'lib/refinery/daemon.rb', line 12 def thread @thread end |
#waiting_queue ⇒ Object (readonly)
The queue for incoming messages to process
16 17 18 |
# File 'lib/refinery/daemon.rb', line 16 def waiting_queue @waiting_queue end |
Instance Method Details
#running? ⇒ Boolean
Return true if the daemon state is running.
39 40 41 |
# File 'lib/refinery/daemon.rb', line 39 def running? state == RUNNING end |
#state=(state) ⇒ Object
Set the daemon state.
33 34 35 |
# File 'lib/refinery/daemon.rb', line 33 def state=(state) @state = state end |
#stop ⇒ Object
Stop the daemon
23 24 25 |
# File 'lib/refinery/daemon.rb', line 23 def stop self.state = STOPPED end |
#workers ⇒ Object
A hash of worker classes
102 103 104 |
# File 'lib/refinery/daemon.rb', line 102 def workers @workers ||= {} end |