Class: InfluxDB::Worker

Inherits:
Object
  • Object
show all
Includes:
Logging
Defined in:
lib/influxdb/worker.rb

Constant Summary collapse

MAX_POST_POINTS =
1000
NUM_WORKER_THREADS =
3
SLEEP_INTERVAL =
5

Constants included from Logging

Logging::PREFIX

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from Logging

logger, logger=

Constructor Details

#initialize(client) ⇒ Worker

Returns a new instance of Worker.



16
17
18
19
20
# File 'lib/influxdb/worker.rb', line 16

def initialize(client)
  @queue = InfluxDB::MaxQueue.new
  @client = client
  spawn_threads!
end

Instance Attribute Details

#clientObject (readonly)

Returns the value of attribute client.



7
8
9
# File 'lib/influxdb/worker.rb', line 7

def client
  @client
end

#queueObject

Returns the value of attribute queue.



8
9
10
# File 'lib/influxdb/worker.rb', line 8

def queue
  @queue
end

Instance Method Details

#check_background_queue(thread_num = 0) ⇒ Object



54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
# File 'lib/influxdb/worker.rb', line 54

def check_background_queue(thread_num = 0)
  log :debug, "Checking background queue on thread #{thread_num} (#{self.current_thread_count} active)"

  begin
    data = []

    while data.size < MAX_POST_POINTS && !@queue.empty?
      p = @queue.pop(true) rescue next;
      data.push p
    end

    return if data.empty?

    begin
      log :debug, "Found data in the queue! (#{data.length} points)"
      @client._write(data)
    rescue => e
      puts "Cannot write data: #{e.inspect}"
    end
  end while @queue.length > MAX_POST_POINTS
end

#current_thread_countObject



26
27
28
# File 'lib/influxdb/worker.rb', line 26

def current_thread_count
  Thread.list.count {|t| t[:influxdb] == self.object_id}
end

#current_threadsObject



22
23
24
# File 'lib/influxdb/worker.rb', line 22

def current_threads
  Thread.list.select {|t| t[:influxdb] == self.object_id}
end

#push(payload) ⇒ Object



30
31
32
# File 'lib/influxdb/worker.rb', line 30

def push(payload)
  queue.push(payload)
end

#spawn_threads!Object



34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
# File 'lib/influxdb/worker.rb', line 34

def spawn_threads!
  NUM_WORKER_THREADS.times do |thread_num|
    log :debug, "Spawning background worker thread #{thread_num}."

    Thread.new do
      Thread.current[:influxdb] = self.object_id

      at_exit do
        log :debug, "Thread exiting, flushing queue."
        check_background_queue(thread_num) until @queue.empty?
      end

      while true
        self.check_background_queue(thread_num)
        sleep rand(SLEEP_INTERVAL)
      end
    end
  end
end