Class: Refinery::Daemon

Inherits:
Object
  • Object
show all
Includes:
Configurable, Loggable, Utilities
Defined in:
lib/refinery/daemon.rb

Overview

A daemon provides a thread to run workers in.

Constant Summary collapse

RUNNING =
'running'
STOPPED =
'stopped'

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from Utilities

#camelize, #decode_message, #encode_message, #host_info

Methods included from Configurable

#config, #config=

Methods included from Loggable

#logger

Constructor Details

#initialize(server, name, waiting_queue, error_queue, done_queue) ⇒ Daemon

Initialize the daemon.

  • server: The server instance

  • name: The processor name

  • waiting_queue: The waiting queue that provides messages to be processed

  • error_queue: The queue where errors are posted.

  • done_queue: The queue for messages that have been processed.



50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
# File 'lib/refinery/daemon.rb', line 50

def initialize(server, name, waiting_queue, error_queue, done_queue)
  Refinery::Server.logger.debug "Starting daemon"
  
  @server = server
  @name = name
  @waiting_queue = waiting_queue
  @error_queue = error_queue
  @done_queue = done_queue
  
  @thread = Thread.new(self) do |daemon|
    logger.debug "Running daemon thread"
    while(running?)
      begin
        while (message = waiting_queue.receive)
          worker = load_worker_class(name).new(self)
          begin
            result, run_time = worker.run(decode_message(message.body))
            if result
              done_message = {
                'host_info' => host_info,
                'original' => message.body,
                'run_time' => run_time
              }
              logger.debug "Sending 'done' message to #{done_queue.name}"
              done_queue.send_message(encode_message(done_message))
            
              logger.debug "Deleting message from queue"
              message.delete()
            end
          rescue Exception => e
            error_message = {
              'error' => {
                'message' => e.message, 
                'class' => e.class.name
              }, 
              'host_info' => host_info,
              'original' => message.body
            }
            error_queue.send_message(encode_message(error_message))
            message.delete()
          end
        end
        sleep(1)
      rescue Exception => e
        logger.error "An error occurred while receiving from the waiting queue: #{e.message}"
      end
    end
    logger.debug "Exiting daemon thread"
  end
end

Instance Attribute Details

#done_queueObject (readonly)

The queue for outgoing messages once they’ve been processed



18
19
20
# File 'lib/refinery/daemon.rb', line 18

def done_queue
  @done_queue
end

#error_queueObject (readonly)

The queue for error messages



20
21
22
# File 'lib/refinery/daemon.rb', line 20

def error_queue
  @error_queue
end

#nameObject (readonly)

The name of the daemon



14
15
16
# File 'lib/refinery/daemon.rb', line 14

def name
  @name
end

#threadObject (readonly)

The daemon’s thread



12
13
14
# File 'lib/refinery/daemon.rb', line 12

def thread
  @thread
end

#waiting_queueObject (readonly)

The queue for incoming messages to process



16
17
18
# File 'lib/refinery/daemon.rb', line 16

def waiting_queue
  @waiting_queue
end

Instance Method Details

#running?Boolean

Return true if the daemon state is running.

Returns:

  • (Boolean)


39
40
41
# File 'lib/refinery/daemon.rb', line 39

def running?
  state == RUNNING
end

#state=(state) ⇒ Object

Set the daemon state.



33
34
35
# File 'lib/refinery/daemon.rb', line 33

def state=(state)
  @state = state
end

#stopObject

Stop the daemon



23
24
25
# File 'lib/refinery/daemon.rb', line 23

def stop
  self.state = STOPPED
end

#workersObject

A hash of worker classes



102
103
104
# File 'lib/refinery/daemon.rb', line 102

def workers
  @workers ||= {}
end