Class: InfluxDB::Writer::Async::Worker

Inherits:
Object
  • Object
show all
Includes:
Logging
Defined in:
lib/influxdb/writer/async.rb

Constant Summary collapse

MAX_POST_POINTS =
1000
MAX_QUEUE_SIZE =
10_000
NUM_WORKER_THREADS =
3
SLEEP_INTERVAL =
5

Constants included from Logging

Logging::PREFIX

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(client, config) ⇒ Worker

Returns a new instance of Worker.



41
42
43
44
45
46
47
48
49
50
51
52
# File 'lib/influxdb/writer/async.rb', line 41

def initialize(client, config)
  @client = client
  config = config.is_a?(Hash) ? config : {}
  @queue = InfluxDB::MaxQueue.new config.fetch(:max_queue, MAX_QUEUE_SIZE)

  spawn_threads!

  at_exit do
    log :debug, "Thread exiting, flushing queue."
    check_background_queue until queue.empty?
  end
end

Instance Attribute Details

#clientObject (readonly)

Returns the value of attribute client.



32
33
34
# File 'lib/influxdb/writer/async.rb', line 32

def client
  @client
end

#queueObject (readonly)

Returns the value of attribute queue.



32
33
34
# File 'lib/influxdb/writer/async.rb', line 32

def queue
  @queue
end

#threadsObject (readonly)

Returns the value of attribute threads.



32
33
34
# File 'lib/influxdb/writer/async.rb', line 32

def threads
  @threads
end

Instance Method Details

#check_background_queue(thread_num = 0) ⇒ Object



88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
# File 'lib/influxdb/writer/async.rb', line 88

def check_background_queue(thread_num = 0)
  log :debug,
      "Checking background queue on thread #{thread_num} (#{current_thread_count} active)"

  loop do
    data = []

    while data.size < MAX_POST_POINTS && !queue.empty?
      p = queue.pop(true) rescue next
      data.push p
    end

    return if data.empty?

    begin
      log :debug, "Found data in the queue! (#{data.length} points)"
      client.write(data)
    rescue => e
      puts "Cannot write data: #{e.inspect}"
    end

    break if queue.length > MAX_POST_POINTS
  end
end

#current_thread_countObject



62
63
64
# File 'lib/influxdb/writer/async.rb', line 62

def current_thread_count
  Thread.list.count { |t| t[:influxdb] == object_id }
end

#current_threadsObject



58
59
60
# File 'lib/influxdb/writer/async.rb', line 58

def current_threads
  Thread.list.select { |t| t[:influxdb] == object_id }
end

#push(payload) ⇒ Object



54
55
56
# File 'lib/influxdb/writer/async.rb', line 54

def push(payload)
  queue.push(payload)
end

#spawn_threads!Object

rubocop:disable Metrics/CyclomaticComplexity rubocop:disable Metrics/MethodLength rubocop:disable Metrics/AbcSize



70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
# File 'lib/influxdb/writer/async.rb', line 70

def spawn_threads!
  @threads = []
  NUM_WORKER_THREADS.times do |thread_num|
    log :debug, "Spawning background worker thread #{thread_num}."

    @threads << Thread.new do
      Thread.current[:influxdb] = object_id

      until client.stopped?
        check_background_queue(thread_num)
        sleep rand(SLEEP_INTERVAL)
      end

      log :debug, "Exit background worker thread #{thread_num}."
    end
  end
end