Class: Fluent::Test::InputTestDriver
- Inherits:
-
TestDriver
- Object
- TestDriver
- Fluent::Test::InputTestDriver
- Defined in:
- lib/fluent/test/input_test.rb
Direct Known Subclasses
BufferedOutputTestDriver, OutputTestDriver, TimeSlicedOutputTestDriver
Instance Attribute Summary collapse
-
#emit_streams ⇒ Object
readonly
Returns the value of attribute emit_streams.
-
#expected_emits_length ⇒ Object
Returns the value of attribute expected_emits_length.
-
#run_timeout ⇒ Object
Returns the value of attribute run_timeout.
Attributes inherited from TestDriver
Instance Method Summary collapse
- #emits ⇒ Object
- #events ⇒ Object
- #expect_emit(tag, time, record) ⇒ Object
- #expected_emits ⇒ Object
-
#initialize(klass, &block) ⇒ InputTestDriver
constructor
A new instance of InputTestDriver.
- #records ⇒ Object
- #register_run_breaking_condition(&block) ⇒ Object
- #register_run_post_condition(&block) ⇒ Object
- #run(num_waits = 10, &block) ⇒ Object
- #run_should_stop? ⇒ Boolean
Methods inherited from TestDriver
Constructor Details
#initialize(klass, &block) ⇒ InputTestDriver
Returns a new instance of InputTestDriver.
32 33 34 35 36 37 38 39 40 |
# File 'lib/fluent/test/input_test.rb', line 32 def initialize(klass, &block) FileBuffer.clear_buffer_paths super(klass, &block) @emit_streams = [] @expects = nil # for checking only the number of emitted records during run @expected_emits_length = nil @run_timeout = 5 end |
Instance Attribute Details
#emit_streams ⇒ Object (readonly)
Returns the value of attribute emit_streams.
53 54 55 |
# File 'lib/fluent/test/input_test.rb', line 53 def emit_streams @emit_streams end |
#expected_emits_length ⇒ Object
Returns the value of attribute expected_emits_length.
51 52 53 |
# File 'lib/fluent/test/input_test.rb', line 51 def expected_emits_length @expected_emits_length end |
#run_timeout ⇒ Object
Returns the value of attribute run_timeout.
52 53 54 |
# File 'lib/fluent/test/input_test.rb', line 52 def run_timeout @run_timeout end |
Instance Method Details
#emits ⇒ Object
55 56 57 58 59 60 61 62 63 |
# File 'lib/fluent/test/input_test.rb', line 55 def emits all = [] @emit_streams.each {|tag,events| events.each {|time,record| all << [tag, time, record] } } all end |
#events ⇒ Object
65 66 67 68 69 70 71 |
# File 'lib/fluent/test/input_test.rb', line 65 def events all = [] @emit_streams.each {|tag,events| all.concat events } all end |
#expect_emit(tag, time, record) ⇒ Object
42 43 44 45 |
# File 'lib/fluent/test/input_test.rb', line 42 def expect_emit(tag, time, record) (@expects ||= []) << [tag, time, record] self end |
#expected_emits ⇒ Object
47 48 49 |
# File 'lib/fluent/test/input_test.rb', line 47 def expected_emits @expects ||= [] end |
#records ⇒ Object
73 74 75 76 77 78 79 80 81 |
# File 'lib/fluent/test/input_test.rb', line 73 def records all = [] @emit_streams.each {|tag,events| events.each {|time,record| all << record } } all end |
#register_run_breaking_condition(&block) ⇒ Object
90 91 92 93 94 95 |
# File 'lib/fluent/test/input_test.rb', line 90 def register_run_breaking_condition(&block) if block @run_breaking_conditions ||= [] @run_breaking_conditions << block end end |
#register_run_post_condition(&block) ⇒ Object
83 84 85 86 87 88 |
# File 'lib/fluent/test/input_test.rb', line 83 def register_run_post_condition(&block) if block @run_post_conditions ||= [] @run_post_conditions << block end end |
#run(num_waits = 10, &block) ⇒ Object
111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 |
# File 'lib/fluent/test/input_test.rb', line 111 def run(num_waits = 10, &block) m = method(:emit_stream) Engine.define_singleton_method(:emit_stream) {|tag,es| m.call(tag, es) } instance.router.define_singleton_method(:emit_stream) {|tag,es| m.call(tag, es) } super(num_waits) { block.call if block if @expected_emits_length || @expects || @run_post_conditions # counters for emits and emit_streams i, j = 0, 0 # Events of expected length will be emitted at the end. max_length = @expected_emits_length max_length ||= @expects.length if @expects if max_length register_run_post_condition do i == max_length end end # Set runnning timeout to avoid infinite loop caused by some errors. started_at = Time.now register_run_breaking_condition do Time.now >= started_at + @run_timeout end until run_should_stop? if j >= @emit_streams.length sleep 0.01 next end tag, events = @emit_streams[j] events.each do |time, record| assert_equal(@expects[i], [tag, time, record]) if @expects i += 1 end j += 1 end assert_equal(@expects.length, i) if @expects end } self end |
#run_should_stop? ⇒ Boolean
97 98 99 100 101 102 103 104 105 106 107 108 109 |
# File 'lib/fluent/test/input_test.rb', line 97 def run_should_stop? # Should stop running if post conditions are not registered. return true unless @run_post_conditions # Should stop running if all of the post conditions are true. return true if @run_post_conditions.all? {|proc| proc.call } # Should stop running if any of the breaking conditions is true. # In this case, some post conditions may be not true. return true if @run_breaking_conditions && @run_breaking_conditions.any? {|proc| proc.call } false end |