Class: InfluxDB::Writer::Async::Worker
- Inherits:
-
Object
- Object
- InfluxDB::Writer::Async::Worker
- Includes:
- Logging
- Defined in:
- lib/influxdb/writer/async.rb
Constant Summary collapse
- MAX_POST_POINTS =
1000
- MAX_QUEUE_SIZE =
10_000
- NUM_WORKER_THREADS =
3
- SLEEP_INTERVAL =
5
Constants included from Logging
Instance Attribute Summary collapse
-
#client ⇒ Object
readonly
Returns the value of attribute client.
-
#queue ⇒ Object
readonly
Returns the value of attribute queue.
-
#threads ⇒ Object
readonly
Returns the value of attribute threads.
Instance Method Summary collapse
- #check_background_queue(thread_num = 0) ⇒ Object
- #current_thread_count ⇒ Object
- #current_threads ⇒ Object
-
#initialize(client, config) ⇒ Worker
constructor
A new instance of Worker.
- #push(payload) ⇒ Object
-
#spawn_threads! ⇒ Object
rubocop:disable Metrics/CyclomaticComplexity rubocop:disable Metrics/MethodLength rubocop:disable Metrics/AbcSize.
Constructor Details
#initialize(client, config) ⇒ Worker
Returns a new instance of Worker.
41 42 43 44 45 46 47 48 49 50 51 52 |
# File 'lib/influxdb/writer/async.rb', line 41 def initialize(client, config) @client = client config = config.is_a?(Hash) ? config : {} @queue = InfluxDB::MaxQueue.new config.fetch(:max_queue, MAX_QUEUE_SIZE) spawn_threads! at_exit do log :debug, "Thread exiting, flushing queue." check_background_queue until queue.empty? end end |
Instance Attribute Details
#client ⇒ Object (readonly)
Returns the value of attribute client.
32 33 34 |
# File 'lib/influxdb/writer/async.rb', line 32 def client @client end |
#queue ⇒ Object (readonly)
Returns the value of attribute queue.
32 33 34 |
# File 'lib/influxdb/writer/async.rb', line 32 def queue @queue end |
#threads ⇒ Object (readonly)
Returns the value of attribute threads.
32 33 34 |
# File 'lib/influxdb/writer/async.rb', line 32 def threads @threads end |
Instance Method Details
#check_background_queue(thread_num = 0) ⇒ Object
88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 |
# File 'lib/influxdb/writer/async.rb', line 88 def check_background_queue(thread_num = 0) log :debug, "Checking background queue on thread #{thread_num} (#{current_thread_count} active)" loop do data = [] while data.size < MAX_POST_POINTS && !queue.empty? p = queue.pop(true) rescue next data.push p end return if data.empty? begin log :debug, "Found data in the queue! (#{data.length} points)" client.write(data) rescue => e puts "Cannot write data: #{e.inspect}" end break if queue.length > MAX_POST_POINTS end end |
#current_thread_count ⇒ Object
62 63 64 |
# File 'lib/influxdb/writer/async.rb', line 62 def current_thread_count Thread.list.count { |t| t[:influxdb] == object_id } end |
#current_threads ⇒ Object
58 59 60 |
# File 'lib/influxdb/writer/async.rb', line 58 def current_threads Thread.list.select { |t| t[:influxdb] == object_id } end |
#push(payload) ⇒ Object
54 55 56 |
# File 'lib/influxdb/writer/async.rb', line 54 def push(payload) queue.push(payload) end |
#spawn_threads! ⇒ Object
rubocop:disable Metrics/CyclomaticComplexity rubocop:disable Metrics/MethodLength rubocop:disable Metrics/AbcSize
70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 |
# File 'lib/influxdb/writer/async.rb', line 70 def spawn_threads! @threads = [] NUM_WORKER_THREADS.times do |thread_num| log :debug, "Spawning background worker thread #{thread_num}." @threads << Thread.new do Thread.current[:influxdb] = object_id until client.stopped? check_background_queue(thread_num) sleep rand(SLEEP_INTERVAL) end log :debug, "Exit background worker thread #{thread_num}." end end end |