Class: Opener::Daemons::Daemon

Inherits:
Object
  • Object
show all
Defined in:
lib/opener/daemons/daemon.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(klass, options = {}) ⇒ Daemon

Returns a new instance of Daemon.



21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
# File 'lib/opener/daemons/daemon.rb', line 21

def initialize(klass, options={})

  @threads = {:readers=>[], :workers=>[], :writers=>[], :reporters=>[]}
  @thread_counts = {:readers => options.fetch(:readers, 1),
                    :workers => options.fetch(:workers, 5),
                    :writers => options.fetch(:writers, 1)}

  @relentless = options.fetch(:relentless, false)
  @sleep_interval = options.fetch(:sleep_interval, 5)

  # Initialize queues
  @input_queue  = Opener::Daemons::SQS.find(options.fetch(:input_queue))
  if options[:output_queue]
    @output_queue = Opener::Daemons::SQS.find(options[:output_queue])
  end

  # Initialize Buffers
  @input_buffer  = Queue.new
  @output_buffer = Queue.new

  # Batch and Buffer size for a smooth flow.
  @batch_size = options.fetch(:batch_size, 10)
  @buffer_size = options[:buffer_size]

  # Working component
  @klass = klass

  script_name = File.basename($0, ".rb")

  @logger = Logger.new(options.fetch(:log, STDOUT))
  @logger.level = if options.fetch(:debug, false)
                    Logger::DEBUG
                  else
                    Logger::INFO
                  end

  logger.debug(options.to_json)
end

Instance Attribute Details

#batch_sizeObject (readonly)

Returns the value of attribute batch_size.



13
14
15
# File 'lib/opener/daemons/daemon.rb', line 13

def batch_size
  @batch_size
end

#buffer_sizeObject (readonly)

Returns the value of attribute buffer_size.



13
14
15
# File 'lib/opener/daemons/daemon.rb', line 13

def buffer_size
  @buffer_size
end

#input_bufferObject (readonly)

Returns the value of attribute input_buffer.



13
14
15
# File 'lib/opener/daemons/daemon.rb', line 13

def input_buffer
  @input_buffer
end

#input_queueObject (readonly)

Returns the value of attribute input_queue.



13
14
15
# File 'lib/opener/daemons/daemon.rb', line 13

def input_queue
  @input_queue
end

#klassObject (readonly)

Returns the value of attribute klass.



13
14
15
# File 'lib/opener/daemons/daemon.rb', line 13

def klass
  @klass
end

#loggerObject (readonly)

Returns the value of attribute logger.



13
14
15
# File 'lib/opener/daemons/daemon.rb', line 13

def logger
  @logger
end

#output_bufferObject (readonly)

Returns the value of attribute output_buffer.



13
14
15
# File 'lib/opener/daemons/daemon.rb', line 13

def output_buffer
  @output_buffer
end

#output_queueObject (readonly)

Returns the value of attribute output_queue.



13
14
15
# File 'lib/opener/daemons/daemon.rb', line 13

def output_queue
  @output_queue
end

#sleep_intervalObject (readonly)

Returns the value of attribute sleep_interval.



13
14
15
# File 'lib/opener/daemons/daemon.rb', line 13

def sleep_interval
  @sleep_interval
end

#thread_countsObject

Returns the value of attribute thread_counts.



19
20
21
# File 'lib/opener/daemons/daemon.rb', line 19

def thread_counts
  @thread_counts
end

#threadsObject

Returns the value of attribute threads.



19
20
21
# File 'lib/opener/daemons/daemon.rb', line 19

def threads
  @threads
end

Instance Method Details

#buffer_new_messagesObject



60
61
62
63
64
65
66
67
68
69
70
71
72
# File 'lib/opener/daemons/daemon.rb', line 60

def buffer_new_messages
  return if input_buffer.size > buffer_size
  return if output_buffer.size > buffer_size
  messages = input_queue.receive_messages(batch_size)

  if messages.nil?
    sleep(sleep_interval)
    return
  end
  messages.each do |message|
    input_buffer << message
  end
end

#extract_callbacks(input) ⇒ Array

Returns an Array containing the callback URLs, ignoring empty values.

Parameters:

  • input (Array|String)

Returns:

  • (Array)


199
200
201
202
203
204
205
# File 'lib/opener/daemons/daemon.rb', line 199

def extract_callbacks(input)
  return [] if input.nil? || input.empty?

  callbacks = input.compact.reject(&:empty?)

  return callbacks
end

#relentless?Boolean

Returns:

  • (Boolean)


189
190
191
# File 'lib/opener/daemons/daemon.rb', line 189

def relentless?
  @relentless
end

#startObject



74
75
76
77
78
79
80
81
82
83
84
85
86
# File 'lib/opener/daemons/daemon.rb', line 74

def start
  Thread.abort_on_exception = true

  start_readers
  start_workers
  start_writers
  start_reporters

  threads[:readers].each(&:join)
  threads[:workers].each(&:join)
  threads[:writers].each(&:join)
  threads[:reporters].each(&:join)
end

#start_readersObject



88
89
90
91
92
93
94
95
96
97
# File 'lib/opener/daemons/daemon.rb', line 88

def start_readers
  thread_counts[:readers].times do |t|
    threads[:readers] << Thread.new do
      logger.info "Reader #{t+1} ready for action..."
      loop do
        buffer_new_messages
      end
    end
  end
end

#start_reportersObject



160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
# File 'lib/opener/daemons/daemon.rb', line 160

def start_reporters
  threads[:reporters] << Thread.new do
    loop do
      log = {:buffers=>{:input=>input_buffer.size}}
      log[:buffers][:output] = output_buffer.size if output_buffer

      logger.debug log.to_json
      sleep(2)
    end
  end

  threads[:reporters] << Thread.new do
    loop do
      thread_types = threads.keys - [:reporters]
      thread_counts = thread_types.map do |type|
        threads[type].select{|thread| thread.status}.count
      end
      zip = thread_types.zip(thread_counts)
      logger.debug "active thread counts: #{zip}"

      sleep(10)
    end
  end
end

#start_workersObject



99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
# File 'lib/opener/daemons/daemon.rb', line 99

def start_workers
  thread_counts[:workers].times do |t|
    threads[:workers] << Thread.new do
      logger.info "Worker #{t+1} launching..."
      identifier = klass.new
      loop do
        message = input_buffer.pop

        input = message[:body]["input"]
        input,* = input if input.kind_of?(Array)

        begin
          output, * = identifier.run(input)
          if output.empty?
            raise "The component returned an empty response."
          end
        rescue Exception => e
          if relentless?
            raise
          else
            logger.error(e)
            output = input
          end
        end
        message[:body].delete("input")
        output_buffer.push({ :output=>output, 
                             :body => message[:body],
                             :handle=>message[:receipt_handle]
                             })
      end
    end
  end
end

#start_writersObject



133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
# File 'lib/opener/daemons/daemon.rb', line 133

def start_writers
  thread_counts[:writers].times do |t|
    threads[:writers] << Thread.new do
      logger.info "Pusher #{t+1} ready for action..."
      loop do
        message = output_buffer.pop
        callbacks = extract_callbacks(message[:body]["callbacks[]"])
        handler = Opener::CallbackHandler.new
        message[:body][:input] = message[:output].force_encoding("UTF-8")
        
        
        unless callbacks.empty?
          callback_url = callbacks.shift
          message[:body][:'callbacks[]'] = callbacks
          payload = {:body => message[:body]}
          handler.post(callback_url, payload)
        else
          payload = {:body => message[:body]}
          handler.post(output_queue.queue_url, payload)
        end
        input_queue.delete_message(message[:handle])
        
      end
    end
  end
end