Class: InfluxDB::Writer::Async::Worker

Inherits:
Object
  • Object
show all
Includes:
Logging
Defined in:
lib/influxdb/writer/async.rb

Constant Summary collapse

MAX_POST_POINTS =
1000
MAX_QUEUE_SIZE =
10_000
NUM_WORKER_THREADS =
3
SLEEP_INTERVAL =
5

Constants included from Logging

Logging::PREFIX

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(client, config) ⇒ Worker

Returns a new instance of Worker.



47
48
49
50
51
52
53
54
55
56
57
58
59
# File 'lib/influxdb/writer/async.rb', line 47

def initialize(client, config)
  @client = client
  config = config.is_a?(Hash) ? config : {}

  @max_post_points    = config.fetch(:max_post_points,    MAX_POST_POINTS)
  @max_queue_size     = config.fetch(:max_queue_size,     MAX_QUEUE_SIZE)
  @num_worker_threads = config.fetch(:num_worker_threads, NUM_WORKER_THREADS)
  @sleep_interval     = config.fetch(:sleep_interval,     SLEEP_INTERVAL)

  @queue = InfluxDB::MaxQueue.new max_queue_size

  spawn_threads!
end

Instance Attribute Details

#clientObject (readonly)

Returns the value of attribute client.



32
33
34
# File 'lib/influxdb/writer/async.rb', line 32

def client
  @client
end

#max_post_pointsObject (readonly)

Returns the value of attribute max_post_points.



32
33
34
# File 'lib/influxdb/writer/async.rb', line 32

def max_post_points
  @max_post_points
end

#max_queue_sizeObject (readonly)

Returns the value of attribute max_queue_size.



32
33
34
# File 'lib/influxdb/writer/async.rb', line 32

def max_queue_size
  @max_queue_size
end

#num_worker_threadsObject (readonly)

Returns the value of attribute num_worker_threads.



32
33
34
# File 'lib/influxdb/writer/async.rb', line 32

def num_worker_threads
  @num_worker_threads
end

#queueObject (readonly)

Returns the value of attribute queue.



32
33
34
# File 'lib/influxdb/writer/async.rb', line 32

def queue
  @queue
end

#sleep_intervalObject (readonly)

Returns the value of attribute sleep_interval.



32
33
34
# File 'lib/influxdb/writer/async.rb', line 32

def sleep_interval
  @sleep_interval
end

#threadsObject (readonly)

Returns the value of attribute threads.



32
33
34
# File 'lib/influxdb/writer/async.rb', line 32

def threads
  @threads
end

Instance Method Details

#check_background_queue(thread_num = 0) ⇒ Object



95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
# File 'lib/influxdb/writer/async.rb', line 95

def check_background_queue(thread_num = 0)
  log :debug,
      "Checking background queue on thread #{thread_num} (#{current_thread_count} active)"

  loop do
    data = []

    while data.size < max_post_points && !queue.empty?
      p = queue.pop(true) rescue next
      data.push p
    end

    return if data.empty?

    begin
      log :debug, "Found data in the queue! (#{data.length} points)"
      client.write(data.join("\n"), nil)
    rescue => e
      puts "Cannot write data: #{e.inspect}"
    end

    break if queue.length > max_post_points
  end
end

#current_thread_countObject



69
70
71
# File 'lib/influxdb/writer/async.rb', line 69

def current_thread_count
  Thread.list.count { |t| t[:influxdb] == object_id }
end

#current_threadsObject



65
66
67
# File 'lib/influxdb/writer/async.rb', line 65

def current_threads
  Thread.list.select { |t| t[:influxdb] == object_id }
end

#push(payload) ⇒ Object



61
62
63
# File 'lib/influxdb/writer/async.rb', line 61

def push(payload)
  queue.push(payload)
end

#spawn_threads!Object

rubocop:disable Metrics/CyclomaticComplexity rubocop:disable Metrics/MethodLength rubocop:disable Metrics/AbcSize



77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
# File 'lib/influxdb/writer/async.rb', line 77

def spawn_threads!
  @threads = []
  num_worker_threads.times do |thread_num|
    log :debug, "Spawning background worker thread #{thread_num}."

    @threads << Thread.new do
      Thread.current[:influxdb] = object_id

      until client.stopped?
        check_background_queue(thread_num)
        sleep rand(sleep_interval)
      end

      log :debug, "Exit background worker thread #{thread_num}."
    end
  end
end

#stop!Object



120
121
122
123
# File 'lib/influxdb/writer/async.rb', line 120

def stop!
  log :debug, "Thread exiting, flushing queue."
  check_background_queue until queue.empty?
end