Class: InfluxDB::AsyncQueue::Server

Inherits:
Object
  • Object
show all
Defined in:
lib/influxdb/async_queue/server.rb

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(config = nil) ⇒ Server

Returns a new instance of Server.



11
12
13
14
# File 'lib/influxdb/async_queue/server.rb', line 11

def initialize(config = nil)
  @config = config || ::InfluxDB::AsyncQueue.config
  @queue = ::InfluxDB::AsyncQueue::Queue.new(@config.adapter)
end

Instance Attribute Details

#configObject (readonly)

Returns the value of attribute config.



15
16
17
# File 'lib/influxdb/async_queue/server.rb', line 15

def config
  @config
end

#queueObject (readonly)

Returns the value of attribute queue.



15
16
17
# File 'lib/influxdb/async_queue/server.rb', line 15

def queue
  @queue
end

Class Method Details

.run(*args) ⇒ Object



7
8
9
# File 'lib/influxdb/async_queue/server.rb', line 7

def self.run(*args)
  new(*args).run
end

Instance Method Details

#iterate(points) ⇒ Object



31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
# File 'lib/influxdb/async_queue/server.rb', line 31

def iterate(points)
  points = queue.pop(config.batch_size)
  if points.empty?
    config.logger.debug 'Queue is empty; sleep'.freeze
    Kernel.sleep(config.sleep_timeout)
    return false
  end

  config.logger.info("Going to write #{points.size} points")
  config.influxdb_client.write(
    points.join("\n"),
    config.influxdb_precision,
    config.influxdb_retention_policy.presence,
    config.influxdb_database
  )
  true
rescue InfluxDB::Error => e
  config.logger.error(e.inspect)
  if points
    queue.push(*points)
    config.logger.debug(points.inspect)
  end
  true
rescue => e
  config.logger.error(e.inspect)
  raise
end

#runObject



17
18
19
20
21
22
23
24
25
26
27
28
29
# File 'lib/influxdb/async_queue/server.rb', line 17

def run
  config.logger.info("Start #{self.inspect}")
  points = nil

  loop do
    next unless iterate(points)
    points = nil
  end
rescue Interrupt
  config.logger.info('Interrupted; exit(0)'.freeze)
  queue.push(*points) if points
  exit(0)
end