Class: Opener::Daemons::Daemon
- Inherits:
-
Object
- Object
- Opener::Daemons::Daemon
- Defined in:
- lib/opener/daemons/daemon.rb
Instance Attribute Summary collapse
-
#batch_size ⇒ Object
readonly
Returns the value of attribute batch_size.
-
#buffer_size ⇒ Object
readonly
Returns the value of attribute buffer_size.
-
#input_buffer ⇒ Object
readonly
Returns the value of attribute input_buffer.
-
#input_queue ⇒ Object
readonly
Returns the value of attribute input_queue.
-
#klass ⇒ Object
readonly
Returns the value of attribute klass.
-
#logger ⇒ Object
readonly
Returns the value of attribute logger.
-
#output_buffer ⇒ Object
readonly
Returns the value of attribute output_buffer.
-
#output_queue ⇒ Object
readonly
Returns the value of attribute output_queue.
-
#sleep_interval ⇒ Object
readonly
Returns the value of attribute sleep_interval.
-
#thread_counts ⇒ Object
Returns the value of attribute thread_counts.
-
#threads ⇒ Object
Returns the value of attribute threads.
Instance Method Summary collapse
- #buffer_new_messages ⇒ Object
-
#extract_callbacks(input) ⇒ Array
Returns an Array containing the callback URLs, ignoring empty values.
-
#initialize(klass, options = {}) ⇒ Daemon
constructor
A new instance of Daemon.
- #relentless? ⇒ Boolean
- #start ⇒ Object
- #start_readers ⇒ Object
- #start_reporters ⇒ Object
- #start_workers ⇒ Object
- #start_writers ⇒ Object
Constructor Details
#initialize(klass, options = {}) ⇒ Daemon
Returns a new instance of Daemon.
21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 |
# File 'lib/opener/daemons/daemon.rb', line 21 def initialize(klass, ={}) @threads = {:readers=>[], :workers=>[], :writers=>[], :reporters=>[]} @thread_counts = {:readers => .fetch(:readers, 1), :workers => .fetch(:workers, 5), :writers => .fetch(:writers, 1)} @relentless = .fetch(:relentless, false) @sleep_interval = .fetch(:sleep_interval, 5) # Initialize queues @input_queue = Opener::Daemons::SQS.find(.fetch(:input_queue)) if [:output_queue] @output_queue = Opener::Daemons::SQS.find([:output_queue]) end # Initialize Buffers @input_buffer = Queue.new @output_buffer = Queue.new # Batch and Buffer size for a smooth flow. @batch_size = .fetch(:batch_size, 10) @buffer_size = [:buffer_size] # Working component @klass = klass script_name = File.basename($0, ".rb") @logger = Logger.new(.fetch(:log, STDOUT)) @logger.level = if .fetch(:debug, false) Logger::DEBUG else Logger::INFO end logger.debug(.to_json) end |
Instance Attribute Details
#batch_size ⇒ Object (readonly)
Returns the value of attribute batch_size.
13 14 15 |
# File 'lib/opener/daemons/daemon.rb', line 13 def batch_size @batch_size end |
#buffer_size ⇒ Object (readonly)
Returns the value of attribute buffer_size.
13 14 15 |
# File 'lib/opener/daemons/daemon.rb', line 13 def buffer_size @buffer_size end |
#input_buffer ⇒ Object (readonly)
Returns the value of attribute input_buffer.
13 14 15 |
# File 'lib/opener/daemons/daemon.rb', line 13 def input_buffer @input_buffer end |
#input_queue ⇒ Object (readonly)
Returns the value of attribute input_queue.
13 14 15 |
# File 'lib/opener/daemons/daemon.rb', line 13 def input_queue @input_queue end |
#klass ⇒ Object (readonly)
Returns the value of attribute klass.
13 14 15 |
# File 'lib/opener/daemons/daemon.rb', line 13 def klass @klass end |
#logger ⇒ Object (readonly)
Returns the value of attribute logger.
13 14 15 |
# File 'lib/opener/daemons/daemon.rb', line 13 def logger @logger end |
#output_buffer ⇒ Object (readonly)
Returns the value of attribute output_buffer.
13 14 15 |
# File 'lib/opener/daemons/daemon.rb', line 13 def output_buffer @output_buffer end |
#output_queue ⇒ Object (readonly)
Returns the value of attribute output_queue.
13 14 15 |
# File 'lib/opener/daemons/daemon.rb', line 13 def output_queue @output_queue end |
#sleep_interval ⇒ Object (readonly)
Returns the value of attribute sleep_interval.
13 14 15 |
# File 'lib/opener/daemons/daemon.rb', line 13 def sleep_interval @sleep_interval end |
#thread_counts ⇒ Object
Returns the value of attribute thread_counts.
19 20 21 |
# File 'lib/opener/daemons/daemon.rb', line 19 def thread_counts @thread_counts end |
#threads ⇒ Object
Returns the value of attribute threads.
19 20 21 |
# File 'lib/opener/daemons/daemon.rb', line 19 def threads @threads end |
Instance Method Details
#buffer_new_messages ⇒ Object
60 61 62 63 64 65 66 67 68 69 70 71 72 |
# File 'lib/opener/daemons/daemon.rb', line 60 def return if input_buffer.size > buffer_size return if output_buffer.size > buffer_size = input_queue.(batch_size) if .nil? sleep(sleep_interval) return end .each do || input_buffer << end end |
#extract_callbacks(input) ⇒ Array
Returns an Array containing the callback URLs, ignoring empty values.
199 200 201 202 203 204 205 |
# File 'lib/opener/daemons/daemon.rb', line 199 def extract_callbacks(input) return [] if input.nil? || input.empty? callbacks = input.compact.reject(&:empty?) return callbacks end |
#relentless? ⇒ Boolean
189 190 191 |
# File 'lib/opener/daemons/daemon.rb', line 189 def relentless? @relentless end |
#start ⇒ Object
74 75 76 77 78 79 80 81 82 83 84 85 86 |
# File 'lib/opener/daemons/daemon.rb', line 74 def start Thread.abort_on_exception = true start_readers start_workers start_writers start_reporters threads[:readers].each(&:join) threads[:workers].each(&:join) threads[:writers].each(&:join) threads[:reporters].each(&:join) end |
#start_readers ⇒ Object
88 89 90 91 92 93 94 95 96 97 |
# File 'lib/opener/daemons/daemon.rb', line 88 def start_readers thread_counts[:readers].times do |t| threads[:readers] << Thread.new do logger.info "Reader #{t+1} ready for action..." loop do end end end end |
#start_reporters ⇒ Object
160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 |
# File 'lib/opener/daemons/daemon.rb', line 160 def start_reporters threads[:reporters] << Thread.new do loop do log = {:buffers=>{:input=>input_buffer.size}} log[:buffers][:output] = output_buffer.size if output_buffer logger.debug log.to_json sleep(2) end end threads[:reporters] << Thread.new do loop do thread_types = threads.keys - [:reporters] thread_counts = thread_types.map do |type| threads[type].select{|thread| thread.status}.count end zip = thread_types.zip(thread_counts) logger.debug "active thread counts: #{zip}" sleep(10) end end end |
#start_workers ⇒ Object
99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 |
# File 'lib/opener/daemons/daemon.rb', line 99 def start_workers thread_counts[:workers].times do |t| threads[:workers] << Thread.new do logger.info "Worker #{t+1} launching..." identifier = klass.new loop do = input_buffer.pop input = [:body]["input"] input,* = input if input.kind_of?(Array) begin output, * = identifier.run(input) if output.empty? raise "The component returned an empty response." end rescue Exception => e if relentless? raise else logger.error(e) output = input end end [:body].delete("input") output_buffer.push({ :output=>output, :body => [:body], :handle=>[:receipt_handle] }) end end end end |
#start_writers ⇒ Object
133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 |
# File 'lib/opener/daemons/daemon.rb', line 133 def start_writers thread_counts[:writers].times do |t| threads[:writers] << Thread.new do logger.info "Pusher #{t+1} ready for action..." loop do = output_buffer.pop callbacks = extract_callbacks([:body]["callbacks[]"]) handler = Opener::CallbackHandler.new [:body][:input] = [:output].force_encoding("UTF-8") unless callbacks.empty? callback_url = callbacks.shift [:body][:'callbacks[]'] = callbacks payload = {:body => [:body]} handler.post(callback_url, payload) else payload = {:body => [:body]} handler.post(output_queue.queue_url, payload) end input_queue.([:handle]) end end end end |