Class: Fluent::DummyDataProducerInput

Inherits:
Input
  • Object
show all
Includes:
Mixin::ConfigPlaceholders
Defined in:
lib/fluent/plugin/in_dummydata_producer.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Instance Attribute Details

#dummydataObject (readonly)

X: number dummydataX “field1”:“data1”,“field2”:“data2”



18
19
20
# File 'lib/fluent/plugin/in_dummydata_producer.rb', line 18

def dummydata
  @dummydata
end

Instance Method Details

#configure(conf) ⇒ Object



25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
# File 'lib/fluent/plugin/in_dummydata_producer.rb', line 25

def configure(conf)
  super

  @increment_value = 0

  @dummydata = []
  re = /^dummydata(\d+)$/
  conf.keys.select{|key| key =~ re}.sort{|a,b| re.match(a)[1].to_i <=> re.match(b)[1].to_i}.each do |key|
    @dummydata.push(JSON.parse(conf[key]))
  end

  if @dummydata.size < 1
    raise Fluent::ConfigError, "no one dummydata exists"
  end
  @dummydata_index = 0
end

#generateObject



53
54
55
56
57
58
59
60
61
62
63
64
65
66
# File 'lib/fluent/plugin/in_dummydata_producer.rb', line 53

def generate
  d = @dummydata[@dummydata_index]
  unless d
    @dummydata_index = 0
    d = @dummydata[0]
  end
  @dummydata_index += 1
  d = d.dup
  if @auto_increment_key
    d[@auto_increment_key] = @increment_value
    @increment_value += 1
  end
  d
end

#runObject



68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
# File 'lib/fluent/plugin/in_dummydata_producer.rb', line 68

def run
  batch_num = (@rate / 9).to_i + 1
  while @running
    current_time = Fluent::Engine.now
    rate_count = 0

    while @running && rate_count < @rate && Fluent::Engine.now == current_time
      batch_num.times do
        Fluent::Engine.emit(@tag, Fluent::Engine.now, generate())
      end
      rate_count += batch_num
      sleep 0.1
    end
    # wait for next second
    while @running && Fluent::Engine.now == current_time
      sleep 0.04
    end
  end
end

#shutdownObject



48
49
50
51
# File 'lib/fluent/plugin/in_dummydata_producer.rb', line 48

def shutdown
  @running = false
  @producer.join
end

#startObject



42
43
44
45
46
# File 'lib/fluent/plugin/in_dummydata_producer.rb', line 42

def start
  super
  @running = true
  @producer = Thread.new(&method(:run))
end