Class: InfluxDB::Worker
- Inherits:
-
Object
- Object
- InfluxDB::Worker
- Includes:
- Logging
- Defined in:
- lib/influxdb/worker.rb
Constant Summary collapse
- MAX_POST_POINTS =
1000
- NUM_WORKER_THREADS =
3
- SLEEP_INTERVAL =
5
Constants included from Logging
Instance Attribute Summary collapse
-
#client ⇒ Object
readonly
Returns the value of attribute client.
-
#queue ⇒ Object
Returns the value of attribute queue.
Instance Method Summary collapse
- #check_background_queue(thread_num = 0) ⇒ Object
- #current_thread_count ⇒ Object
- #current_threads ⇒ Object
-
#initialize(client) ⇒ Worker
constructor
A new instance of Worker.
- #push(payload) ⇒ Object
- #spawn_threads! ⇒ Object
Methods included from Logging
Constructor Details
#initialize(client) ⇒ Worker
Returns a new instance of Worker.
16 17 18 19 20 21 22 23 24 25 |
# File 'lib/influxdb/worker.rb', line 16 def initialize(client) @queue = InfluxDB::MaxQueue.new @client = client spawn_threads! at_exit do log :debug, "Thread exiting, flushing queue." check_background_queue until @queue.empty? end end |
Instance Attribute Details
#client ⇒ Object (readonly)
Returns the value of attribute client.
7 8 9 |
# File 'lib/influxdb/worker.rb', line 7 def client @client end |
#queue ⇒ Object
Returns the value of attribute queue.
8 9 10 |
# File 'lib/influxdb/worker.rb', line 8 def queue @queue end |
Instance Method Details
#check_background_queue(thread_num = 0) ⇒ Object
58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 |
# File 'lib/influxdb/worker.rb', line 58 def check_background_queue(thread_num = 0) log :debug, "Checking background queue on thread #{thread_num} (#{self.current_thread_count} active)" begin data = [] while data.size < MAX_POST_POINTS && !@queue.empty? p = @queue.pop(true) rescue next; data.push p end return if data.empty? begin log :debug, "Found data in the queue! (#{data.length} points)" @client._write(data) rescue => e puts "Cannot write data: #{e.inspect}" end end while @queue.length > MAX_POST_POINTS end |
#current_thread_count ⇒ Object
31 32 33 |
# File 'lib/influxdb/worker.rb', line 31 def current_thread_count Thread.list.count {|t| t[:influxdb] == self.object_id} end |
#current_threads ⇒ Object
27 28 29 |
# File 'lib/influxdb/worker.rb', line 27 def current_threads Thread.list.select {|t| t[:influxdb] == self.object_id} end |
#push(payload) ⇒ Object
35 36 37 38 39 40 41 |
# File 'lib/influxdb/worker.rb', line 35 def push(payload) if payload.is_a? Array payload.each {|p| queue.push(p) } else queue.push(payload) end end |
#spawn_threads! ⇒ Object
43 44 45 46 47 48 49 50 51 52 53 54 55 56 |
# File 'lib/influxdb/worker.rb', line 43 def spawn_threads! NUM_WORKER_THREADS.times do |thread_num| log :debug, "Spawning background worker thread #{thread_num}." Thread.new do Thread.current[:influxdb] = self.object_id while !client.stopped? self.check_background_queue(thread_num) sleep rand(SLEEP_INTERVAL) end end end end |