Class: InfluxDB::Writer::Async::Worker

Inherits:
Object
  • Object
show all
Includes:
Logging
Defined in:
lib/influxdb/writer/async.rb

Constant Summary collapse

MAX_POST_POINTS =
1000
MAX_QUEUE_SIZE =
10_000
NUM_WORKER_THREADS =
3
SLEEP_INTERVAL =
5
BLOCK_ON_FULL_QUEUE =
false

Constants included from Logging

Logging::PREFIX

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from Logging

log?

Constructor Details

#initialize(client, config) ⇒ Worker

Returns a new instance of Worker.



48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
# File 'lib/influxdb/writer/async.rb', line 48

def initialize(client, config)
  @client = client
  config = config.is_a?(Hash) ? config : {}

  @max_post_points     = config.fetch(:max_post_points,     MAX_POST_POINTS)
  @max_queue_size      = config.fetch(:max_queue_size,      MAX_QUEUE_SIZE)
  @num_worker_threads  = config.fetch(:num_worker_threads,  NUM_WORKER_THREADS)
  @sleep_interval      = config.fetch(:sleep_interval,      SLEEP_INTERVAL)
  @block_on_full_queue = config.fetch(:block_on_full_queue, BLOCK_ON_FULL_QUEUE)

  queue_class = @block_on_full_queue ? SizedQueue : InfluxDB::MaxQueue
  @queue = queue_class.new max_queue_size

  spawn_threads!
end

Instance Attribute Details

#block_on_full_queueObject (readonly)

Returns the value of attribute block_on_full_queue.



31
32
33
# File 'lib/influxdb/writer/async.rb', line 31

def block_on_full_queue
  @block_on_full_queue
end

#clientObject (readonly)

Returns the value of attribute client.



31
32
33
# File 'lib/influxdb/writer/async.rb', line 31

def client
  @client
end

#max_post_pointsObject (readonly)

Returns the value of attribute max_post_points.



31
32
33
# File 'lib/influxdb/writer/async.rb', line 31

def max_post_points
  @max_post_points
end

#max_queue_sizeObject (readonly)

Returns the value of attribute max_queue_size.



31
32
33
# File 'lib/influxdb/writer/async.rb', line 31

def max_queue_size
  @max_queue_size
end

#num_worker_threadsObject (readonly)

Returns the value of attribute num_worker_threads.



31
32
33
# File 'lib/influxdb/writer/async.rb', line 31

def num_worker_threads
  @num_worker_threads
end

#queueObject (readonly)

Returns the value of attribute queue.



31
32
33
# File 'lib/influxdb/writer/async.rb', line 31

def queue
  @queue
end

#sleep_intervalObject (readonly)

Returns the value of attribute sleep_interval.



31
32
33
# File 'lib/influxdb/writer/async.rb', line 31

def sleep_interval
  @sleep_interval
end

#threadsObject (readonly)

Returns the value of attribute threads.



31
32
33
# File 'lib/influxdb/writer/async.rb', line 31

def threads
  @threads
end

Instance Method Details

#check_background_queue(thread_num = 0) ⇒ Object



98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
# File 'lib/influxdb/writer/async.rb', line 98

def check_background_queue(thread_num = 0)
  log(:debug) do
    "Checking background queue on thread #{thread_num} (#{current_thread_count} active)"
  end

  loop do
    data = {}

    while data.all? { |_, points| points.size < max_post_points } && !queue.empty?
      begin
        payload, precision, retention_policy, database = queue.pop(true)
        key = {
          db: database,
          pr: precision,
          rp: retention_policy,
        }
        data[key] ||= []
        data[key] << payload
      rescue ThreadError
        next
      end
    end

    return if data.values.flatten.empty?

    begin
      log(:debug) { "Found data in the queue! (#{sizes(data)})" }
      write(data)
    rescue StandardError => e
      log :error, "Cannot write data: #{e.inspect}"
    end

    break if queue.length > max_post_points
  end
end

#current_thread_countObject



72
73
74
# File 'lib/influxdb/writer/async.rb', line 72

def current_thread_count
  Thread.list.count { |t| t[:influxdb] == object_id }
end

#current_threadsObject



68
69
70
# File 'lib/influxdb/writer/async.rb', line 68

def current_threads
  Thread.list.select { |t| t[:influxdb] == object_id }
end

#push(payload, precision = nil, retention_policy = nil, database = nil) ⇒ Object



64
65
66
# File 'lib/influxdb/writer/async.rb', line 64

def push(payload, precision = nil, retention_policy = nil, database = nil)
  queue.push([payload, precision, retention_policy, database])
end

#spawn_threads!Object

rubocop:disable Metrics/CyclomaticComplexity rubocop:disable Metrics/MethodLength rubocop:disable Metrics/AbcSize



80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
# File 'lib/influxdb/writer/async.rb', line 80

def spawn_threads!
  @threads = []
  num_worker_threads.times do |thread_num|
    log(:debug) { "Spawning background worker thread #{thread_num}." }

    @threads << Thread.new do
      Thread.current[:influxdb] = object_id

      until client.stopped?
        check_background_queue(thread_num)
        sleep rand(sleep_interval)
      end

      log(:debug) { "Exit background worker thread #{thread_num}." }
    end
  end
end

#stop!Object



134
135
136
137
# File 'lib/influxdb/writer/async.rb', line 134

def stop!
  log(:debug) { "Thread exiting, flushing queue." }
  check_background_queue until queue.empty?
end