Class: LogStash::TestPipeline
- Inherits:
-
JavaPipeline
- Object
- JavaPipeline
- LogStash::TestPipeline
- Defined in:
- lib/logstash/test_pipeline.rb
Defined Under Namespace
Classes: EventTrackingQueueReadClientDelegator, QueueBatchDelegator
Instance Method Summary collapse
Instance Method Details
#filter_queue_client ⇒ Object
Note:
only works since LS 6.4 (need to use tha actual ivar for 6.3)
29 30 31 |
# File 'lib/logstash/test_pipeline.rb', line 29 def filter_queue_client @filter_queue_client || super end |
#run_with(events) ⇒ Object
8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 |
# File 'lib/logstash/test_pipeline.rb', line 8 def run_with(events) if inputs&.any? # will work but might be unintended config = "\n #{config_str}" if $VERBOSE warn "#{self} pipeline is getting events pushed manually while having inputs: #{inputs.inspect} #{config}" end # TODO could we handle a generator (Enumerator) ? queue.write_client.push_batch events.to_a queue_read_client = filter_queue_client # potentially handle run_with called multiple times (re-use the same instance) : if queue_read_client.is_a?(EventTrackingQueueReadClientDelegator) queue_read_client.reset_events! else # start_worker using @filter_queue_client on 6.3, since 6.4 a reader method is used # to make things compatible with 6.3 we're assigning the @filter_queue_client ivar @filter_queue_client = EventTrackingQueueReadClientDelegator.new queue_read_client end run end |
#start_and_wait ⇒ Object
35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 |
# File 'lib/logstash/test_pipeline.rb', line 35 def start_and_wait parent_thread = Thread.current @finished_execution.make_false @finished_run&.make_false # only since 6.5 @thread = Thread.new do begin LogStash::Util.set_thread_name("pipeline.#{pipeline_id}") ThreadContext.put("pipeline.id", pipeline_id) run @finished_run&.make_true rescue => e close parent_thread.raise(e) ensure @finished_execution.make_true end end unless wait_until_started raise "failed to start pipeline: #{self}\n with config: #{config_str.inspect}" end @thread end |