Class: Fluent::Test::InputTestDriver

Inherits:
TestDriver show all
Defined in:
lib/fluent/test/input_test.rb

Defined Under Namespace

Modules: EmitStreamWrapper

Instance Attribute Summary collapse

Attributes inherited from TestDriver

#config, #instance

Instance Method Summary collapse

Methods inherited from TestDriver

#configure

Constructor Details

#initialize(klass, &block) ⇒ InputTestDriver

Returns a new instance of InputTestDriver.



24
25
26
27
28
29
30
31
32
33
# File 'lib/fluent/test/input_test.rb', line 24

def initialize(klass, &block)
  super(klass, &block)
  @emit_streams = []
  @event_streams = []
  @expects = nil
  # for checking only the number of emitted records during run
  @expected_emits_length = nil
  @run_timeout = 5
  @run_post_conditions = []
end

Instance Attribute Details

#emit_streamsObject (readonly)

Returns the value of attribute emit_streams.



46
47
48
# File 'lib/fluent/test/input_test.rb', line 46

def emit_streams
  @emit_streams
end

#event_streamsObject (readonly)

Returns the value of attribute event_streams.



46
47
48
# File 'lib/fluent/test/input_test.rb', line 46

def event_streams
  @event_streams
end

#expected_emits_lengthObject

Returns the value of attribute expected_emits_length.



44
45
46
# File 'lib/fluent/test/input_test.rb', line 44

def expected_emits_length
  @expected_emits_length
end

#run_timeoutObject

Returns the value of attribute run_timeout.



45
46
47
# File 'lib/fluent/test/input_test.rb', line 45

def run_timeout
  @run_timeout
end

Instance Method Details

#emitsObject



48
49
50
51
52
53
54
55
56
# File 'lib/fluent/test/input_test.rb', line 48

def emits
  all = []
  @emit_streams.each {|tag,events|
    events.each {|time,record|
      all << [tag, time, record]
    }
  }
  all
end

#eventsObject



58
59
60
61
62
63
64
# File 'lib/fluent/test/input_test.rb', line 58

def events
  all = []
  @emit_streams.each {|tag,events|
    all.concat events
  }
  all
end

#expect_emit(tag, time, record) ⇒ Object



35
36
37
38
# File 'lib/fluent/test/input_test.rb', line 35

def expect_emit(tag, time, record)
  (@expects ||= []) << [tag, time, record]
  self
end

#expected_emitsObject



40
41
42
# File 'lib/fluent/test/input_test.rb', line 40

def expected_emits
  @expects ||= []
end

#recordsObject



66
67
68
69
70
71
72
73
74
# File 'lib/fluent/test/input_test.rb', line 66

def records
  all = []
  @emit_streams.each {|tag,events|
    events.each {|time,record|
      all << record
    }
  }
  all
end

#register_run_breaking_condition(&block) ⇒ Object



82
83
84
85
86
87
# File 'lib/fluent/test/input_test.rb', line 82

def register_run_breaking_condition(&block)
  if block
    @run_breaking_conditions ||= []
    @run_breaking_conditions << block
  end
end

#register_run_post_condition(&block) ⇒ Object



76
77
78
79
80
# File 'lib/fluent/test/input_test.rb', line 76

def register_run_post_condition(&block)
  if block
    @run_post_conditions << block
  end
end

#run(num_waits = 10, &block) ⇒ Object



112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
# File 'lib/fluent/test/input_test.rb', line 112

def run(num_waits = 10, &block)
  m = method(:emit_stream)
  unless Engine.singleton_class.ancestors.include?(EmitStreamWrapper)
    Engine.singleton_class.prepend EmitStreamWrapper
  end
  Engine.emit_stream_callee = m
  unless instance.router.singleton_class.ancestors.include?(EmitStreamWrapper)
    instance.router.singleton_class.prepend EmitStreamWrapper
  end
  instance.router.emit_stream_callee = m

  super(num_waits) {
    block.call if block

    if @expected_emits_length || @expects || @run_post_conditions
      # counters for emits and emit_streams
      i, j = 0, 0

      # Events of expected length will be emitted at the end.
      max_length = @expected_emits_length
      max_length ||= @expects.length if @expects
      if max_length
        register_run_post_condition do
          i == max_length
        end
      end

      # Set running timeout to avoid infinite loop caused by some errors.
      started_at = Time.now
      register_run_breaking_condition do
        Time.now >= started_at + @run_timeout
      end

      until run_should_stop?
        if j >= @emit_streams.length
          sleep 0.01
          next
        end

        tag, events = @emit_streams[j]
        events.each do |time, record|
          if @expects
            assert_equal(@expects[i], [tag, time, record])
            assert_equal_event_time(@expects[i][1], time) if @expects[i][1].is_a?(Fluent::EventTime)
          end
          i += 1
        end
        j += 1
      end
      assert_equal(@expects.length, i) if @expects
    end
  }
  self
end

#run_should_stop?Boolean

Returns:

  • (Boolean)


89
90
91
92
93
94
95
96
97
98
99
100
101
# File 'lib/fluent/test/input_test.rb', line 89

def run_should_stop?
  # Should stop running if post conditions are not registered.
  return true unless @run_post_conditions

  # Should stop running if all of the post conditions are true.
  return true if @run_post_conditions.all? {|proc| proc.call }

  # Should stop running if any of the breaking conditions is true.
  # In this case, some post conditions may be not true.
  return true if @run_breaking_conditions && @run_breaking_conditions.any? {|proc| proc.call }

  false
end