Class: Fluent::Test::InputTestDriver

Inherits:
TestDriver show all
Defined in:
lib/fluent/test/input_test.rb

Defined Under Namespace

Modules: EmitStreamWrapper

Instance Attribute Summary collapse

Attributes inherited from TestDriver

#config, #instance

Instance Method Summary collapse

Methods inherited from TestDriver

#configure

Constructor Details

#initialize(klass, &block) ⇒ InputTestDriver

Returns a new instance of InputTestDriver.



24
25
26
27
28
29
30
31
32
# File 'lib/fluent/test/input_test.rb', line 24

def initialize(klass, &block)
  super(klass, &block)
  @emit_streams = []
  @expects = nil
  # for checking only the number of emitted records during run
  @expected_emits_length = nil
  @run_timeout = 5
  @run_post_conditions = []
end

Instance Attribute Details

#emit_streamsObject (readonly)

Returns the value of attribute emit_streams.



45
46
47
# File 'lib/fluent/test/input_test.rb', line 45

def emit_streams
  @emit_streams
end

#expected_emits_lengthObject

Returns the value of attribute expected_emits_length.



43
44
45
# File 'lib/fluent/test/input_test.rb', line 43

def expected_emits_length
  @expected_emits_length
end

#run_timeoutObject

Returns the value of attribute run_timeout.



44
45
46
# File 'lib/fluent/test/input_test.rb', line 44

def run_timeout
  @run_timeout
end

Instance Method Details

#emitsObject



47
48
49
50
51
52
53
54
55
# File 'lib/fluent/test/input_test.rb', line 47

def emits
  all = []
  @emit_streams.each {|tag,events|
    events.each {|time,record|
      all << [tag, time, record]
    }
  }
  all
end

#eventsObject



57
58
59
60
61
62
63
# File 'lib/fluent/test/input_test.rb', line 57

def events
  all = []
  @emit_streams.each {|tag,events|
    all.concat events
  }
  all
end

#expect_emit(tag, time, record) ⇒ Object



34
35
36
37
# File 'lib/fluent/test/input_test.rb', line 34

def expect_emit(tag, time, record)
  (@expects ||= []) << [tag, time, record]
  self
end

#expected_emitsObject



39
40
41
# File 'lib/fluent/test/input_test.rb', line 39

def expected_emits
  @expects ||= []
end

#recordsObject



65
66
67
68
69
70
71
72
73
# File 'lib/fluent/test/input_test.rb', line 65

def records
  all = []
  @emit_streams.each {|tag,events|
    events.each {|time,record|
      all << record
    }
  }
  all
end

#register_run_breaking_condition(&block) ⇒ Object



81
82
83
84
85
86
# File 'lib/fluent/test/input_test.rb', line 81

def register_run_breaking_condition(&block)
  if block
    @run_breaking_conditions ||= []
    @run_breaking_conditions << block
  end
end

#register_run_post_condition(&block) ⇒ Object



75
76
77
78
79
# File 'lib/fluent/test/input_test.rb', line 75

def register_run_post_condition(&block)
  if block
    @run_post_conditions << block
  end
end

#run(num_waits = 10, &block) ⇒ Object



111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
# File 'lib/fluent/test/input_test.rb', line 111

def run(num_waits = 10, &block)
  m = method(:emit_stream)
  unless Engine.singleton_class.ancestors.include?(EmitStreamWrapper)
    Engine.singleton_class.prepend EmitStreamWrapper
  end
  Engine.emit_stream_callee = m
  unless instance.router.singleton_class.ancestors.include?(EmitStreamWrapper)
    instance.router.singleton_class.prepend EmitStreamWrapper
  end
  instance.router.emit_stream_callee = m

  super(num_waits) {
    block.call if block

    if @expected_emits_length || @expects || @run_post_conditions
      # counters for emits and emit_streams
      i, j = 0, 0

      # Events of expected length will be emitted at the end.
      max_length = @expected_emits_length
      max_length ||= @expects.length if @expects
      if max_length
        register_run_post_condition do
          i == max_length
        end
      end

      # Set runnning timeout to avoid infinite loop caused by some errors.
      started_at = Time.now
      register_run_breaking_condition do
        Time.now >= started_at + @run_timeout
      end

      until run_should_stop?
        if j >= @emit_streams.length
          sleep 0.01
          next
        end

        tag, events = @emit_streams[j]
        events.each do |time, record|
          if @expects
            assert_equal(@expects[i], [tag, time, record])
            assert_equal_event_time(@expects[i][1], time) if @expects[i][1].is_a?(Fluent::EventTime)
          end
          i += 1
        end
        j += 1
      end
      assert_equal(@expects.length, i) if @expects
    end
  }
  self
end

#run_should_stop?Boolean

Returns:

  • (Boolean)


88
89
90
91
92
93
94
95
96
97
98
99
100
# File 'lib/fluent/test/input_test.rb', line 88

def run_should_stop?
  # Should stop running if post conditions are not registered.
  return true unless @run_post_conditions

  # Should stop running if all of the post conditions are true.
  return true if @run_post_conditions.all? {|proc| proc.call }

  # Should stop running if any of the breaking conditions is true.
  # In this case, some post conditions may be not true.
  return true if @run_breaking_conditions && @run_breaking_conditions.any? {|proc| proc.call }

  false
end