Class: InfluxDB::Worker
- Inherits:
-
Object
- Object
- InfluxDB::Worker
- Includes:
- Logging
- Defined in:
- lib/influxdb/worker.rb
Constant Summary collapse
- MAX_POST_POINTS =
1000- NUM_WORKER_THREADS =
3- SLEEP_INTERVAL =
5
Constants included from Logging
Instance Attribute Summary collapse
-
#client ⇒ Object
readonly
Returns the value of attribute client.
-
#queue ⇒ Object
Returns the value of attribute queue.
Instance Method Summary collapse
- #check_background_queue(thread_num = 0) ⇒ Object
- #current_thread_count ⇒ Object
- #current_threads ⇒ Object
-
#initialize(client) ⇒ Worker
constructor
A new instance of Worker.
- #push(payload) ⇒ Object
- #spawn_threads! ⇒ Object
Methods included from Logging
Constructor Details
Instance Attribute Details
#client ⇒ Object (readonly)
Returns the value of attribute client.
7 8 9 |
# File 'lib/influxdb/worker.rb', line 7 def client @client end |
#queue ⇒ Object
Returns the value of attribute queue.
8 9 10 |
# File 'lib/influxdb/worker.rb', line 8 def queue @queue end |
Instance Method Details
#check_background_queue(thread_num = 0) ⇒ Object
54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 |
# File 'lib/influxdb/worker.rb', line 54 def check_background_queue(thread_num = 0) log :debug, "Checking background queue on thread #{thread_num} (#{self.current_thread_count} active)" begin data = [] while data.size < MAX_POST_POINTS && !@queue.empty? p = @queue.pop(true) rescue next; data.push p end return if data.empty? begin log :debug, "Found data in the queue! (#{data.length} points)" @client._write(data) rescue => e puts "Cannot write data: #{e.inspect}" end end while @queue.length > MAX_POST_POINTS end |
#current_thread_count ⇒ Object
26 27 28 |
# File 'lib/influxdb/worker.rb', line 26 def current_thread_count Thread.list.count {|t| t[:influxdb] == self.object_id} end |
#current_threads ⇒ Object
22 23 24 |
# File 'lib/influxdb/worker.rb', line 22 def current_threads Thread.list.select {|t| t[:influxdb] == self.object_id} end |
#push(payload) ⇒ Object
30 31 32 |
# File 'lib/influxdb/worker.rb', line 30 def push(payload) queue.push(payload) end |
#spawn_threads! ⇒ Object
34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 |
# File 'lib/influxdb/worker.rb', line 34 def spawn_threads! NUM_WORKER_THREADS.times do |thread_num| log :debug, "Spawning background worker thread #{thread_num}." Thread.new do Thread.current[:influxdb] = self.object_id at_exit do log :debug, "Thread exiting, flushing queue." check_background_queue(thread_num) until @queue.empty? end while true self.check_background_queue(thread_num) sleep rand(SLEEP_INTERVAL) end end end end |