Class: InfluxDB::Writer::Async::Worker

Inherits:
Object
  • Object
show all
Includes:
Logging
Defined in:
lib/influxdb/writer/async.rb

Overview

rubocop:disable Metrics/ClassLength

Constant Summary collapse

MAX_POST_POINTS =
1000
MAX_QUEUE_SIZE =
10_000
NUM_WORKER_THREADS =
3
SLEEP_INTERVAL =
5
BLOCK_ON_FULL_QUEUE =
false

Constants included from Logging

Logging::PREFIX

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from Logging

log?

Constructor Details

#initialize(client, config) ⇒ Worker

rubocop:disable Metrics/MethodLength



61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
# File 'lib/influxdb/writer/async.rb', line 61

def initialize(client, config) # rubocop:disable Metrics/MethodLength
  @client = client
  config = config.is_a?(Hash) ? config : {}

  @max_post_points     = config.fetch(:max_post_points,     MAX_POST_POINTS)
  @max_queue_size      = config.fetch(:max_queue_size,      MAX_QUEUE_SIZE)
  @num_worker_threads  = config.fetch(:num_worker_threads,  NUM_WORKER_THREADS)
  @sleep_interval      = config.fetch(:sleep_interval,      SLEEP_INTERVAL)
  @block_on_full_queue = config.fetch(:block_on_full_queue, BLOCK_ON_FULL_QUEUE)
  @shutdown_timeout    = config.fetch(:shutdown_timeout,    2 * @sleep_interval)

  queue_class = @block_on_full_queue ? SizedQueue : InfluxDB::MaxQueue
  @queue = queue_class.new max_queue_size
  @should_stop = false
  spawn_threads!
end

Instance Attribute Details

#block_on_full_queueObject (readonly)

Returns the value of attribute block_on_full_queue.



43
44
45
# File 'lib/influxdb/writer/async.rb', line 43

def block_on_full_queue
  @block_on_full_queue
end

#clientObject (readonly)

Returns the value of attribute client.



43
44
45
# File 'lib/influxdb/writer/async.rb', line 43

def client
  @client
end

#max_post_pointsObject (readonly)

Returns the value of attribute max_post_points.



43
44
45
# File 'lib/influxdb/writer/async.rb', line 43

def max_post_points
  @max_post_points
end

#max_queue_sizeObject (readonly)

Returns the value of attribute max_queue_size.



43
44
45
# File 'lib/influxdb/writer/async.rb', line 43

def max_queue_size
  @max_queue_size
end

#num_worker_threadsObject (readonly)

Returns the value of attribute num_worker_threads.



43
44
45
# File 'lib/influxdb/writer/async.rb', line 43

def num_worker_threads
  @num_worker_threads
end

#queueObject (readonly)

Returns the value of attribute queue.



43
44
45
# File 'lib/influxdb/writer/async.rb', line 43

def queue
  @queue
end

#shutdown_timeoutObject (readonly)

Returns the value of attribute shutdown_timeout.



43
44
45
# File 'lib/influxdb/writer/async.rb', line 43

def shutdown_timeout
  @shutdown_timeout
end

#sleep_intervalObject (readonly)

Returns the value of attribute sleep_interval.



43
44
45
# File 'lib/influxdb/writer/async.rb', line 43

def sleep_interval
  @sleep_interval
end

#threadsObject (readonly)

Returns the value of attribute threads.



43
44
45
# File 'lib/influxdb/writer/async.rb', line 43

def threads
  @threads
end

Instance Method Details

#check_background_queue(thread_num = -1)) ⇒ Object



112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
# File 'lib/influxdb/writer/async.rb', line 112

def check_background_queue(thread_num = -1)
  log(:debug) do
    "Checking background queue on thread #{thread_num} (#{current_thread_count} active)"
  end

  loop do
    data = {}

    while data.all? { |_, points| points.size < max_post_points } && !queue.empty?
      begin
        payload, precision, retention_policy, database = queue.pop(true)
        key = {
          db: database,
          pr: precision,
          rp: retention_policy,
        }
        data[key] ||= []
        data[key] << payload
      rescue ThreadError
        next
      end
    end

    return if data.values.flatten.empty?

    begin
      log(:debug) { "Found data in the queue! (#{sizes(data)}) on thread #{thread_num}" }
      write(data)
    rescue StandardError => e
      log :error, "Cannot write data: #{e.inspect} on thread #{thread_num}"
    end

    break if queue.length > max_post_points
  end
end

#current_thread_countObject



86
87
88
# File 'lib/influxdb/writer/async.rb', line 86

def current_thread_count
  @threads.count
end

#current_threadsObject



82
83
84
# File 'lib/influxdb/writer/async.rb', line 82

def current_threads
  @threads
end

#push(payload, precision = nil, retention_policy = nil, database = nil) ⇒ Object



78
79
80
# File 'lib/influxdb/writer/async.rb', line 78

def push(payload, precision = nil, retention_policy = nil, database = nil)
  queue.push([payload, precision, retention_policy, database])
end

#spawn_threads!Object

rubocop:disable Metrics/CyclomaticComplexity rubocop:disable Metrics/MethodLength rubocop:disable Metrics/AbcSize



94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
# File 'lib/influxdb/writer/async.rb', line 94

def spawn_threads!
  @threads = []
  num_worker_threads.times do |thread_num|
    log(:debug) { "Spawning background worker thread #{thread_num}." }

    @threads << Thread.new do
      Thread.current[:influxdb] = object_id

      until @should_stop
        check_background_queue(thread_num)
        sleep rand(sleep_interval)
      end

      log(:debug) { "Exit background worker thread #{thread_num}." }
    end
  end
end

#stop!Object

rubocop:enable Metrics/CyclomaticComplexity rubocop:enable Metrics/MethodLength rubocop:enable Metrics/AbcSize



152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
# File 'lib/influxdb/writer/async.rb', line 152

def stop!
  log(:debug) { "Worker is being stopped, flushing queue." }

  # If retry was infinite (-1), set it to zero to give the threads one
  # last chance to write their data
  client.config.retry = 0 if client.config.retry < 0

  # Signal the background threads that they should exit.
  @should_stop = true

  # Wait for the threads to exit and then kill them
  @threads.each do |t|
    r = t.join(shutdown_timeout)
    t.kill if r.nil?
  end

  # Flush any remaining items in the queue on the main thread
  check_background_queue until queue.empty?
end