Class: Fluent::DummyInput

Inherits:
Input
  • Object
show all
Defined in:
lib/fluent/plugin/in_dummy.rb

Constant Summary collapse

BIN_NUM =
10

Constants included from Configurable

Configurable::CONFIG_TYPE_REGISTRY

Instance Attribute Summary

Attributes inherited from Input

#router

Attributes included from PluginLoggerMixin

#log

Instance Method Summary collapse

Methods inherited from Input

#initialize

Methods included from PluginLoggerMixin

included, #initialize

Methods included from PluginId

#plugin_id

Methods included from Configurable

#config, included, #initialize, lookup_type, register_type

Constructor Details

This class inherits a constructor from Fluent::Input

Instance Method Details

#configure(conf) ⇒ Object



51
52
53
54
55
56
# File 'lib/fluent/plugin/in_dummy.rb', line 51

def configure(conf)
  super

  @increment_value = 0
  @dummy_index = 0
end

#emit(num) ⇒ Object



86
87
88
# File 'lib/fluent/plugin/in_dummy.rb', line 86

def emit(num)
  num.times { router.emit(@tag, Fluent::Engine.now, generate()) }
end

#generateObject



90
91
92
93
94
95
96
97
98
99
100
101
102
103
# File 'lib/fluent/plugin/in_dummy.rb', line 90

def generate
  d = @dummy[@dummy_index]
  unless d
    @dummy_index = 0
    d = @dummy[0]
  end
  @dummy_index += 1
  if @auto_increment_key
    d = d.dup
    d[@auto_increment_key] = @increment_value
    @increment_value += 1
  end
  d
end

#runObject



69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
# File 'lib/fluent/plugin/in_dummy.rb', line 69

def run
  batch_num    = (@rate / BIN_NUM).to_i
  residual_num = (@rate % BIN_NUM)
  while @running
    current_time = Time.now.to_i
    BIN_NUM.times do
      break unless (@running && Time.now.to_i <= current_time)
      wait(0.1) { emit(batch_num) }
    end
    emit(residual_num)
    # wait for next second
    while @running && Time.now.to_i <= current_time
      sleep 0.01
    end
  end
end

#shutdownObject



64
65
66
67
# File 'lib/fluent/plugin/in_dummy.rb', line 64

def shutdown
  @running = false
  @thread.join
end

#startObject



58
59
60
61
62
# File 'lib/fluent/plugin/in_dummy.rb', line 58

def start
  super
  @running = true
  @thread = Thread.new(&method(:run))
end

#wait(time) ⇒ Object



105
106
107
108
109
110
# File 'lib/fluent/plugin/in_dummy.rb', line 105

def wait(time)
  start_time = Time.now
  yield
  sleep_time = time - (Time.now - start_time)
  sleep sleep_time if sleep_time > 0
end