Class: Fluent::Plugin::DummyInput
- Defined in:
- lib/fluent/plugin/in_dummy.rb
Constant Summary collapse
- BIN_NUM =
10
- DEFAULT_STORAGE_TYPE =
'local'
Constants included from Configurable
Configurable::CONFIG_TYPE_REGISTRY
Instance Attribute Summary
Attributes included from Fluent::PluginLoggerMixin
Attributes inherited from Base
Instance Method Summary collapse
- #configure(conf) ⇒ Object
- #emit(num) ⇒ Object
- #generate ⇒ Object
-
#initialize ⇒ DummyInput
constructor
A new instance of DummyInput.
- #run ⇒ Object
- #start ⇒ Object
- #wait(time) ⇒ Object
Methods included from Fluent::PluginHelper::Mixin
Methods included from Fluent::PluginLoggerMixin
Methods included from Fluent::PluginId
#plugin_id, #plugin_id_configured?, #plugin_id_for_test?
Methods inherited from Base
#after_shutdown, #after_shutdown?, #after_start, #after_started?, #before_shutdown, #before_shutdown?, #close, #closed?, #configured?, #has_router?, #inspect, #shutdown, #shutdown?, #started?, #stop, #stopped?, #terminate, #terminated?
Methods included from SystemConfig::Mixin
#system_config, #system_config_override
Methods included from Configurable
#config, included, lookup_type, register_type
Constructor Details
#initialize ⇒ DummyInput
Returns a new instance of DummyInput.
58 59 60 61 |
# File 'lib/fluent/plugin/in_dummy.rb', line 58 def initialize super @storage = nil end |
Instance Method Details
#configure(conf) ⇒ Object
63 64 65 66 67 68 |
# File 'lib/fluent/plugin/in_dummy.rb', line 63 def configure(conf) super @dummy_index = 0 config = conf.elements.select{|e| e.name == 'storage' }.first @storage = storage_create(usage: 'suspend', conf: config, default_type: DEFAULT_STORAGE_TYPE) end |
#emit(num) ⇒ Object
100 101 102 103 104 105 106 107 108 109 110 111 112 |
# File 'lib/fluent/plugin/in_dummy.rb', line 100 def emit(num) begin if @size > 1 num.times do router.emit_array(@tag, Array.new(@size) { [Fluent::Engine.now, generate] }) end else num.times { router.emit(@tag, Fluent::Engine.now, generate) } end rescue => _ # ignore all errors not to stop emits by emit errors end end |
#generate ⇒ Object
114 115 116 117 118 119 120 121 122 123 124 125 126 |
# File 'lib/fluent/plugin/in_dummy.rb', line 114 def generate d = @dummy[@dummy_index] unless d @dummy_index = 0 d = @dummy[@dummy_index] end @dummy_index += 1 if @auto_increment_key d = d.dup d[@auto_increment_key] = @storage.update(:auto_increment_value){|v| v + 1 } end d end |
#run ⇒ Object
83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 |
# File 'lib/fluent/plugin/in_dummy.rb', line 83 def run batch_num = (@rate / BIN_NUM).to_i residual_num = (@rate % BIN_NUM) while thread_current_running? current_time = Time.now.to_i BIN_NUM.times do break unless (thread_current_running? && Time.now.to_i <= current_time) wait(0.1) { emit(batch_num) } end emit(residual_num) if thread_current_running? # wait for next second while thread_current_running? && Time.now.to_i <= current_time sleep 0.01 end end end |
#start ⇒ Object
70 71 72 73 74 75 76 77 78 79 80 81 |
# File 'lib/fluent/plugin/in_dummy.rb', line 70 def start super @storage.put(:increment_value, 0) unless @storage.get(:increment_value) @storage.put(:dummy_index, 0) unless @storage.get(:dummy_index) if @auto_increment_key && !@storage.get(:auto_increment_value) @storage.put(:auto_increment_value, -1) end thread_create(:dummy_input, &method(:run)) end |
#wait(time) ⇒ Object
128 129 130 131 132 133 |
# File 'lib/fluent/plugin/in_dummy.rb', line 128 def wait(time) start_time = Time.now yield sleep_time = time - (Time.now - start_time) sleep sleep_time if sleep_time > 0 end |