Class: InfluxDB::Writer::Async::Worker
- Inherits:
-
Object
- Object
- InfluxDB::Writer::Async::Worker
- Includes:
- Logging
- Defined in:
- lib/influxdb/writer/async.rb
Constant Summary collapse
- MAX_POST_POINTS =
1000
- MAX_QUEUE_SIZE =
10_000
- NUM_WORKER_THREADS =
3
- SLEEP_INTERVAL =
5
Constants included from Logging
Instance Attribute Summary collapse
-
#client ⇒ Object
readonly
Returns the value of attribute client.
-
#max_post_points ⇒ Object
readonly
Returns the value of attribute max_post_points.
-
#max_queue_size ⇒ Object
readonly
Returns the value of attribute max_queue_size.
-
#num_worker_threads ⇒ Object
readonly
Returns the value of attribute num_worker_threads.
-
#queue ⇒ Object
readonly
Returns the value of attribute queue.
-
#sleep_interval ⇒ Object
readonly
Returns the value of attribute sleep_interval.
-
#threads ⇒ Object
readonly
Returns the value of attribute threads.
Instance Method Summary collapse
- #check_background_queue(thread_num = 0) ⇒ Object
- #current_thread_count ⇒ Object
- #current_threads ⇒ Object
-
#initialize(client, config) ⇒ Worker
constructor
A new instance of Worker.
- #push(payload) ⇒ Object
-
#spawn_threads! ⇒ Object
rubocop:disable Metrics/CyclomaticComplexity rubocop:disable Metrics/MethodLength rubocop:disable Metrics/AbcSize.
- #stop! ⇒ Object
Constructor Details
#initialize(client, config) ⇒ Worker
Returns a new instance of Worker.
47 48 49 50 51 52 53 54 55 56 57 58 59 |
# File 'lib/influxdb/writer/async.rb', line 47 def initialize(client, config) @client = client config = config.is_a?(Hash) ? config : {} @max_post_points = config.fetch(:max_post_points, MAX_POST_POINTS) @max_queue_size = config.fetch(:max_queue_size, MAX_QUEUE_SIZE) @num_worker_threads = config.fetch(:num_worker_threads, NUM_WORKER_THREADS) @sleep_interval = config.fetch(:sleep_interval, SLEEP_INTERVAL) @queue = InfluxDB::MaxQueue.new max_queue_size spawn_threads! end |
Instance Attribute Details
#client ⇒ Object (readonly)
Returns the value of attribute client.
32 33 34 |
# File 'lib/influxdb/writer/async.rb', line 32 def client @client end |
#max_post_points ⇒ Object (readonly)
Returns the value of attribute max_post_points.
32 33 34 |
# File 'lib/influxdb/writer/async.rb', line 32 def max_post_points @max_post_points end |
#max_queue_size ⇒ Object (readonly)
Returns the value of attribute max_queue_size.
32 33 34 |
# File 'lib/influxdb/writer/async.rb', line 32 def max_queue_size @max_queue_size end |
#num_worker_threads ⇒ Object (readonly)
Returns the value of attribute num_worker_threads.
32 33 34 |
# File 'lib/influxdb/writer/async.rb', line 32 def num_worker_threads @num_worker_threads end |
#queue ⇒ Object (readonly)
Returns the value of attribute queue.
32 33 34 |
# File 'lib/influxdb/writer/async.rb', line 32 def queue @queue end |
#sleep_interval ⇒ Object (readonly)
Returns the value of attribute sleep_interval.
32 33 34 |
# File 'lib/influxdb/writer/async.rb', line 32 def sleep_interval @sleep_interval end |
#threads ⇒ Object (readonly)
Returns the value of attribute threads.
32 33 34 |
# File 'lib/influxdb/writer/async.rb', line 32 def threads @threads end |
Instance Method Details
#check_background_queue(thread_num = 0) ⇒ Object
95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 |
# File 'lib/influxdb/writer/async.rb', line 95 def check_background_queue(thread_num = 0) log :debug, "Checking background queue on thread #{thread_num} (#{current_thread_count} active)" loop do data = [] while data.size < max_post_points && !queue.empty? p = queue.pop(true) rescue next data.push p end return if data.empty? begin log :debug, "Found data in the queue! (#{data.length} points)" client.write(data.join("\n"), nil) rescue => e puts "Cannot write data: #{e.inspect}" end break if queue.length > max_post_points end end |
#current_thread_count ⇒ Object
69 70 71 |
# File 'lib/influxdb/writer/async.rb', line 69 def current_thread_count Thread.list.count { |t| t[:influxdb] == object_id } end |
#current_threads ⇒ Object
65 66 67 |
# File 'lib/influxdb/writer/async.rb', line 65 def current_threads Thread.list.select { |t| t[:influxdb] == object_id } end |
#push(payload) ⇒ Object
61 62 63 |
# File 'lib/influxdb/writer/async.rb', line 61 def push(payload) queue.push(payload) end |
#spawn_threads! ⇒ Object
rubocop:disable Metrics/CyclomaticComplexity rubocop:disable Metrics/MethodLength rubocop:disable Metrics/AbcSize
77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 |
# File 'lib/influxdb/writer/async.rb', line 77 def spawn_threads! @threads = [] num_worker_threads.times do |thread_num| log :debug, "Spawning background worker thread #{thread_num}." @threads << Thread.new do Thread.current[:influxdb] = object_id until client.stopped? check_background_queue(thread_num) sleep rand(sleep_interval) end log :debug, "Exit background worker thread #{thread_num}." end end end |
#stop! ⇒ Object
120 121 122 123 |
# File 'lib/influxdb/writer/async.rb', line 120 def stop! log :debug, "Thread exiting, flushing queue." check_background_queue until queue.empty? end |