Class: InfluxDB::Writer::Async::Worker

Inherits:
Object
  • Object
show all
Includes:
Logging
Defined in:
lib/influxdb/writer/async.rb

Constant Summary collapse

MAX_POST_POINTS =
1000
MAX_QUEUE_SIZE =
10_000
NUM_WORKER_THREADS =
3
SLEEP_INTERVAL =
5

Constants included from Logging

Logging::PREFIX

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from Logging

log?

Constructor Details

#initialize(client, config) ⇒ Worker

Returns a new instance of Worker.



47
48
49
50
51
52
53
54
55
56
57
58
59
# File 'lib/influxdb/writer/async.rb', line 47

def initialize(client, config)
  @client = client
  config = config.is_a?(Hash) ? config : {}

  @max_post_points    = config.fetch(:max_post_points,    MAX_POST_POINTS)
  @max_queue_size     = config.fetch(:max_queue_size,     MAX_QUEUE_SIZE)
  @num_worker_threads = config.fetch(:num_worker_threads, NUM_WORKER_THREADS)
  @sleep_interval     = config.fetch(:sleep_interval,     SLEEP_INTERVAL)

  @queue = InfluxDB::MaxQueue.new max_queue_size

  spawn_threads!
end

Instance Attribute Details

#clientObject (readonly)

Returns the value of attribute client.



32
33
34
# File 'lib/influxdb/writer/async.rb', line 32

def client
  @client
end

#max_post_pointsObject (readonly)

Returns the value of attribute max_post_points.



32
33
34
# File 'lib/influxdb/writer/async.rb', line 32

def max_post_points
  @max_post_points
end

#max_queue_sizeObject (readonly)

Returns the value of attribute max_queue_size.



32
33
34
# File 'lib/influxdb/writer/async.rb', line 32

def max_queue_size
  @max_queue_size
end

#num_worker_threadsObject (readonly)

Returns the value of attribute num_worker_threads.



32
33
34
# File 'lib/influxdb/writer/async.rb', line 32

def num_worker_threads
  @num_worker_threads
end

#queueObject (readonly)

Returns the value of attribute queue.



32
33
34
# File 'lib/influxdb/writer/async.rb', line 32

def queue
  @queue
end

#sleep_intervalObject (readonly)

Returns the value of attribute sleep_interval.



32
33
34
# File 'lib/influxdb/writer/async.rb', line 32

def sleep_interval
  @sleep_interval
end

#threadsObject (readonly)

Returns the value of attribute threads.



32
33
34
# File 'lib/influxdb/writer/async.rb', line 32

def threads
  @threads
end

Instance Method Details

#check_background_queue(thread_num = 0) ⇒ Object



95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
# File 'lib/influxdb/writer/async.rb', line 95

def check_background_queue(thread_num = 0)
  log(:debug) do
    "Checking background queue on thread #{thread_num} (#{current_thread_count} active)"
  end

  loop do
    data = []

    while data.size < max_post_points && !queue.empty?
      begin
        p = queue.pop(true)
        data.push p
      rescue ThreadError
        next
      end
    end

    return if data.empty?

    begin
      log(:debug) { "Found data in the queue! (#{data.length} points)" }
      client.write(data.join("\n"), nil)
    rescue StandardError => e
      log :error, "Cannot write data: #{e.inspect}"
    end

    break if queue.length > max_post_points
  end
end

#current_thread_countObject



69
70
71
# File 'lib/influxdb/writer/async.rb', line 69

def current_thread_count
  Thread.list.count { |t| t[:influxdb] == object_id }
end

#current_threadsObject



65
66
67
# File 'lib/influxdb/writer/async.rb', line 65

def current_threads
  Thread.list.select { |t| t[:influxdb] == object_id }
end

#push(payload) ⇒ Object



61
62
63
# File 'lib/influxdb/writer/async.rb', line 61

def push(payload)
  queue.push(payload)
end

#spawn_threads!Object

rubocop:disable Metrics/CyclomaticComplexity rubocop:disable Metrics/MethodLength rubocop:disable Metrics/AbcSize



77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
# File 'lib/influxdb/writer/async.rb', line 77

def spawn_threads!
  @threads = []
  num_worker_threads.times do |thread_num|
    log(:debug) { "Spawning background worker thread #{thread_num}." }

    @threads << Thread.new do
      Thread.current[:influxdb] = object_id

      until client.stopped?
        check_background_queue(thread_num)
        sleep rand(sleep_interval)
      end

      log(:debug) { "Exit background worker thread #{thread_num}." }
    end
  end
end

#stop!Object



125
126
127
128
# File 'lib/influxdb/writer/async.rb', line 125

def stop!
  log(:debug) { "Thread exiting, flushing queue." }
  check_background_queue until queue.empty?
end