Class: Fluent::Test::InputTestDriver

Inherits:
TestDriver show all
Defined in:
lib/fluent/test/input_test.rb

Instance Attribute Summary collapse

Attributes inherited from TestDriver

#config, #instance

Instance Method Summary collapse

Methods inherited from TestDriver

#configure

Constructor Details

#initialize(klass, &block) ⇒ InputTestDriver

Returns a new instance of InputTestDriver.



32
33
34
35
36
37
38
39
40
# File 'lib/fluent/test/input_test.rb', line 32

def initialize(klass, &block)
  FileBuffer.clear_buffer_paths
  super(klass, &block)
  @emit_streams = []
  @expects = nil
  # for checking only the number of emitted records during run
  @expected_emits_length = nil
  @run_timeout = 5
end

Instance Attribute Details

#emit_streamsObject (readonly)

Returns the value of attribute emit_streams.



53
54
55
# File 'lib/fluent/test/input_test.rb', line 53

def emit_streams
  @emit_streams
end

#expected_emits_lengthObject

Returns the value of attribute expected_emits_length.



51
52
53
# File 'lib/fluent/test/input_test.rb', line 51

def expected_emits_length
  @expected_emits_length
end

#run_timeoutObject

Returns the value of attribute run_timeout.



52
53
54
# File 'lib/fluent/test/input_test.rb', line 52

def run_timeout
  @run_timeout
end

Instance Method Details

#emitsObject



55
56
57
58
59
60
61
62
63
# File 'lib/fluent/test/input_test.rb', line 55

def emits
  all = []
  @emit_streams.each {|tag,events|
    events.each {|time,record|
      all << [tag, time, record]
    }
  }
  all
end

#eventsObject



65
66
67
68
69
70
71
# File 'lib/fluent/test/input_test.rb', line 65

def events
  all = []
  @emit_streams.each {|tag,events|
    all.concat events
  }
  all
end

#expect_emit(tag, time, record) ⇒ Object



42
43
44
45
# File 'lib/fluent/test/input_test.rb', line 42

def expect_emit(tag, time, record)
  (@expects ||= []) << [tag, time, record]
  self
end

#expected_emitsObject



47
48
49
# File 'lib/fluent/test/input_test.rb', line 47

def expected_emits
  @expects ||= []
end

#recordsObject



73
74
75
76
77
78
79
80
81
# File 'lib/fluent/test/input_test.rb', line 73

def records
  all = []
  @emit_streams.each {|tag,events|
    events.each {|time,record|
      all << record
    }
  }
  all
end

#register_run_breaking_condition(&block) ⇒ Object



90
91
92
93
94
95
# File 'lib/fluent/test/input_test.rb', line 90

def register_run_breaking_condition(&block)
  if block
    @run_breaking_conditions ||= []
    @run_breaking_conditions << block
  end
end

#register_run_post_condition(&block) ⇒ Object



83
84
85
86
87
88
# File 'lib/fluent/test/input_test.rb', line 83

def register_run_post_condition(&block)
  if block
    @run_post_conditions ||= []
    @run_post_conditions << block
  end
end

#run(num_waits = 10, &block) ⇒ Object



111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
# File 'lib/fluent/test/input_test.rb', line 111

def run(num_waits = 10, &block)
  m = method(:emit_stream)
  Engine.define_singleton_method(:emit_stream) {|tag,es|
    m.call(tag, es)
  }
  instance.router.define_singleton_method(:emit_stream) {|tag,es|
    m.call(tag, es)
  }
  super(num_waits) {
    block.call if block

    if @expected_emits_length || @expects || @run_post_conditions
      # counters for emits and emit_streams
      i, j = 0, 0

      # Events of expected length will be emitted at the end.
      max_length = @expected_emits_length
      max_length ||= @expects.length if @expects
      if max_length
        register_run_post_condition do
          i == max_length
        end
      end

      # Set runnning timeout to avoid infinite loop caused by some errors.
      started_at = Time.now
      register_run_breaking_condition do
        Time.now >= started_at + @run_timeout
      end

      until run_should_stop?
        if j >= @emit_streams.length
          sleep 0.01
          next
        end

        tag, events = @emit_streams[j]
        events.each do |time, record|
          assert_equal(@expects[i], [tag, time, record]) if @expects
          i += 1
        end
        j += 1
      end
      assert_equal(@expects.length, i) if @expects
    end
  }
  self
end

#run_should_stop?Boolean

Returns:

  • (Boolean)


97
98
99
100
101
102
103
104
105
106
107
108
109
# File 'lib/fluent/test/input_test.rb', line 97

def run_should_stop?
  # Should stop running if post conditions are not registered.
  return true unless @run_post_conditions

  # Should stop running if all of the post conditions are true.
  return true if @run_post_conditions.all? {|proc| proc.call }

  # Should stop running if any of the breaking conditions is true.
  # In this case, some post conditions may be not true.
  return true if @run_breaking_conditions && @run_breaking_conditions.any? {|proc| proc.call }

  false
end