Class: InfluxDB::Worker

Inherits:
Object
  • Object
show all
Includes:
Logging
Defined in:
lib/influxdb/worker.rb

Constant Summary collapse

MAX_POST_POINTS =
1000
NUM_WORKER_THREADS =
3
SLEEP_INTERVAL =
5

Constants included from Logging

Logging::PREFIX

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from Logging

logger, logger=

Constructor Details

#initialize(client) ⇒ Worker

Returns a new instance of Worker.



16
17
18
19
20
21
22
23
24
25
# File 'lib/influxdb/worker.rb', line 16

def initialize(client)
  @queue = InfluxDB::MaxQueue.new
  @client = client
  spawn_threads!

  at_exit do
    log :debug, "Thread exiting, flushing queue."
    check_background_queue until @queue.empty?
  end
end

Instance Attribute Details

#clientObject (readonly)

Returns the value of attribute client.



7
8
9
# File 'lib/influxdb/worker.rb', line 7

def client
  @client
end

#queueObject

Returns the value of attribute queue.



8
9
10
# File 'lib/influxdb/worker.rb', line 8

def queue
  @queue
end

Instance Method Details

#check_background_queue(thread_num = 0) ⇒ Object



58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
# File 'lib/influxdb/worker.rb', line 58

def check_background_queue(thread_num = 0)
  log :debug, "Checking background queue on thread #{thread_num} (#{self.current_thread_count} active)"

  begin
    data = []

    while data.size < MAX_POST_POINTS && !@queue.empty?
      p = @queue.pop(true) rescue next;
      data.push p
    end

    return if data.empty?

    begin
      log :debug, "Found data in the queue! (#{data.length} points)"
      @client._write(data)
    rescue => e
      puts "Cannot write data: #{e.inspect}"
    end
  end while @queue.length > MAX_POST_POINTS
end

#current_thread_countObject



31
32
33
# File 'lib/influxdb/worker.rb', line 31

def current_thread_count
  Thread.list.count {|t| t[:influxdb] == self.object_id}
end

#current_threadsObject



27
28
29
# File 'lib/influxdb/worker.rb', line 27

def current_threads
  Thread.list.select {|t| t[:influxdb] == self.object_id}
end

#push(payload) ⇒ Object



35
36
37
38
39
40
41
# File 'lib/influxdb/worker.rb', line 35

def push(payload)
  if payload.is_a? Array
    payload.each {|p| queue.push(p) }
  else
    queue.push(payload)
  end
end

#spawn_threads!Object



43
44
45
46
47
48
49
50
51
52
53
54
55
56
# File 'lib/influxdb/worker.rb', line 43

def spawn_threads!
  NUM_WORKER_THREADS.times do |thread_num|
    log :debug, "Spawning background worker thread #{thread_num}."

    Thread.new do
      Thread.current[:influxdb] = self.object_id

      while !client.stopped?
        self.check_background_queue(thread_num)
        sleep rand(SLEEP_INTERVAL)
      end
    end
  end
end