Class: InfluxDB::Writer::Async::Worker

Inherits:
Object
  • Object
show all
Includes:
Logging
Defined in:
lib/influxdb/writer/async.rb

Constant Summary collapse

MAX_POST_POINTS =
1000
MAX_QUEUE_SIZE =
10_000
NUM_WORKER_THREADS =
3
SLEEP_INTERVAL =
5
BLOCK_ON_FULL_QUEUE =
false

Constants included from Logging

Logging::PREFIX

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from Logging

log?

Constructor Details

#initialize(client, config) ⇒ Worker

Returns a new instance of Worker.



50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
# File 'lib/influxdb/writer/async.rb', line 50

def initialize(client, config)
  @client = client
  config = config.is_a?(Hash) ? config : {}

  @max_post_points     = config.fetch(:max_post_points,     MAX_POST_POINTS)
  @max_queue_size      = config.fetch(:max_queue_size,      MAX_QUEUE_SIZE)
  @num_worker_threads  = config.fetch(:num_worker_threads,  NUM_WORKER_THREADS)
  @sleep_interval      = config.fetch(:sleep_interval,      SLEEP_INTERVAL)
  @block_on_full_queue = config.fetch(:block_on_full_queue, BLOCK_ON_FULL_QUEUE)

  queue_class = @block_on_full_queue ? SizedQueue : InfluxDB::MaxQueue
  @queue = queue_class.new max_queue_size

  spawn_threads!
end

Instance Attribute Details

#block_on_full_queueObject (readonly)

Returns the value of attribute block_on_full_queue.



33
34
35
# File 'lib/influxdb/writer/async.rb', line 33

def block_on_full_queue
  @block_on_full_queue
end

#clientObject (readonly)

Returns the value of attribute client.



33
34
35
# File 'lib/influxdb/writer/async.rb', line 33

def client
  @client
end

#max_post_pointsObject (readonly)

Returns the value of attribute max_post_points.



33
34
35
# File 'lib/influxdb/writer/async.rb', line 33

def max_post_points
  @max_post_points
end

#max_queue_sizeObject (readonly)

Returns the value of attribute max_queue_size.



33
34
35
# File 'lib/influxdb/writer/async.rb', line 33

def max_queue_size
  @max_queue_size
end

#num_worker_threadsObject (readonly)

Returns the value of attribute num_worker_threads.



33
34
35
# File 'lib/influxdb/writer/async.rb', line 33

def num_worker_threads
  @num_worker_threads
end

#queueObject (readonly)

Returns the value of attribute queue.



33
34
35
# File 'lib/influxdb/writer/async.rb', line 33

def queue
  @queue
end

#sleep_intervalObject (readonly)

Returns the value of attribute sleep_interval.



33
34
35
# File 'lib/influxdb/writer/async.rb', line 33

def sleep_interval
  @sleep_interval
end

#threadsObject (readonly)

Returns the value of attribute threads.



33
34
35
# File 'lib/influxdb/writer/async.rb', line 33

def threads
  @threads
end

Instance Method Details

#check_background_queue(thread_num = 0) ⇒ Object



100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
# File 'lib/influxdb/writer/async.rb', line 100

def check_background_queue(thread_num = 0)
  log(:debug) do
    "Checking background queue on thread #{thread_num} (#{current_thread_count} active)"
  end

  loop do
    data = {}

    while data.all? { |_, points| points.size < max_post_points } && !queue.empty?
      begin
        payload, precision, retention_policy, database = queue.pop(true)
        key = {
          db: database,
          pr: precision,
          rp: retention_policy,
        }
        data[key] ||= []
        data[key] << payload
      rescue ThreadError
        next
      end
    end

    return if data.values.flatten.empty?

    begin
      log(:debug) { "Found data in the queue! (#{sizes(data)})" }
      write(data)
    rescue StandardError => e
      log :error, "Cannot write data: #{e.inspect}"
    end

    break if queue.length > max_post_points
  end
end

#current_thread_countObject



74
75
76
# File 'lib/influxdb/writer/async.rb', line 74

def current_thread_count
  Thread.list.count { |t| t[:influxdb] == object_id }
end

#current_threadsObject



70
71
72
# File 'lib/influxdb/writer/async.rb', line 70

def current_threads
  Thread.list.select { |t| t[:influxdb] == object_id }
end

#push(payload, precision = nil, retention_policy = nil, database = nil) ⇒ Object



66
67
68
# File 'lib/influxdb/writer/async.rb', line 66

def push(payload, precision = nil, retention_policy = nil, database = nil)
  queue.push([payload, precision, retention_policy, database])
end

#spawn_threads!Object

rubocop:disable Metrics/CyclomaticComplexity rubocop:disable Metrics/MethodLength rubocop:disable Metrics/AbcSize



82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
# File 'lib/influxdb/writer/async.rb', line 82

def spawn_threads!
  @threads = []
  num_worker_threads.times do |thread_num|
    log(:debug) { "Spawning background worker thread #{thread_num}." }

    @threads << Thread.new do
      Thread.current[:influxdb] = object_id

      until client.stopped?
        check_background_queue(thread_num)
        sleep rand(sleep_interval)
      end

      log(:debug) { "Exit background worker thread #{thread_num}." }
    end
  end
end

#stop!Object

rubocop:enable Metrics/CyclomaticComplexity rubocop:enable Metrics/MethodLength rubocop:enable Metrics/AbcSize



140
141
142
143
# File 'lib/influxdb/writer/async.rb', line 140

def stop!
  log(:debug) { "Thread exiting, flushing queue." }
  check_background_queue until queue.empty?
end