Class: LogStash::Util::WrappedSynchronousQueue::ReadClient
- Inherits:
- 
      Object
      
        - Object
- LogStash::Util::WrappedSynchronousQueue::ReadClient
 
- Defined in:
- lib/logstash/util/wrapped_synchronous_queue.rb
Instance Method Summary collapse
- #add_filtered_metrics(filtered_size) ⇒ Object
- #add_output_metrics(filtered_size) ⇒ Object
- #close ⇒ Object
- #close_batch(batch) ⇒ Object
- #define_initial_metrics_values(namespaced_metric) ⇒ Object
- #empty? ⇒ Boolean
- #inflight_batches {|@inflight_batches| ... } ⇒ Object
- 
  
    
      #initialize(queue, batch_size = 125, wait_for = 250)  ⇒ ReadClient 
    
    
  
  
  
    constructor
  
  
  
  
  
  
  
    We generally only want one thread at a time able to access pop/take/poll operations from this queue. 
- 
  
    
      #new_batch  ⇒ ReadBatch 
    
    
  
  
  
  
  
  
  
  
  
    create a new empty batch. 
- #read_batch ⇒ Object
- #set_batch_dimensions(batch_size, wait_for) ⇒ Object
- #set_events_metric(metric) ⇒ Object
- #set_pipeline_metric(metric) ⇒ Object
- #start_metrics(batch) ⇒ Object
Constructor Details
#initialize(queue, batch_size = 125, wait_for = 250) ⇒ ReadClient
We generally only want one thread at a time able to access pop/take/poll operations from this queue. We also depend on this to be able to block consumers while we snapshot in-flight buffers
| 32 33 34 35 36 37 38 39 40 41 42 | # File 'lib/logstash/util/wrapped_synchronous_queue.rb', line 32 def initialize(queue, batch_size = 125, wait_for = 250) @queue = queue # Note that @inflight_batches as a central mechanism for tracking inflight # batches will fail if we have multiple read clients in the pipeline. @inflight_batches = Concurrent::Map.new # allow the worker thread to report the execution time of the filter + output @inflight_clocks = Concurrent::Map.new @batch_size = batch_size @wait_for = TimeUnit::NANOSECONDS.convert(wait_for, TimeUnit::MILLISECONDS) end | 
Instance Method Details
#add_filtered_metrics(filtered_size) ⇒ Object
| 117 118 119 120 | # File 'lib/logstash/util/wrapped_synchronous_queue.rb', line 117 def add_filtered_metrics(filtered_size) @event_metric_filtered.increment(filtered_size) @pipeline_metric_filtered.increment(filtered_size) end | 
#add_output_metrics(filtered_size) ⇒ Object
| 122 123 124 125 | # File 'lib/logstash/util/wrapped_synchronous_queue.rb', line 122 def add_output_metrics(filtered_size) @event_metric_out.increment(filtered_size) @pipeline_metric_out.increment(filtered_size) end | 
#close ⇒ Object
| 44 45 46 | # File 'lib/logstash/util/wrapped_synchronous_queue.rb', line 44 def close # noop, compat with acked queue read client end | 
#close_batch(batch) ⇒ Object
| 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 | # File 'lib/logstash/util/wrapped_synchronous_queue.rb', line 101 def close_batch(batch) thread = Thread.current @inflight_batches.delete(thread) start_time = @inflight_clocks.get_and_set(thread, nil) unless start_time.nil? if batch.size > 0 # only stop (which also records) the metrics if the batch is non-empty. # start_clock is now called at empty batch creation and an empty batch could # stay empty all the way down to the close_batch call. time_taken = (java.lang.System.nano_time - start_time) / 1_000_000 @event_metric_time.increment(time_taken) @pipeline_metric_time.increment(time_taken) end end end | 
#define_initial_metrics_values(namespaced_metric) ⇒ Object
| 73 74 75 76 77 | # File 'lib/logstash/util/wrapped_synchronous_queue.rb', line 73 def define_initial_metrics_values(namespaced_metric) namespaced_metric.report_time(:duration_in_millis, 0) namespaced_metric.increment(:filtered, 0) namespaced_metric.increment(:out, 0) end | 
#empty? ⇒ Boolean
| 48 49 50 | # File 'lib/logstash/util/wrapped_synchronous_queue.rb', line 48 def empty? @queue.isEmpty end | 
#inflight_batches {|@inflight_batches| ... } ⇒ Object
| 79 80 81 | # File 'lib/logstash/util/wrapped_synchronous_queue.rb', line 79 def inflight_batches yield(@inflight_batches) end | 
#new_batch ⇒ ReadBatch
create a new empty batch
| 85 86 87 | # File 'lib/logstash/util/wrapped_synchronous_queue.rb', line 85 def new_batch ReadBatch.new(@queue, 0, 0) end | 
#read_batch ⇒ Object
| 89 90 91 92 93 | # File 'lib/logstash/util/wrapped_synchronous_queue.rb', line 89 def read_batch batch = ReadBatch.new(@queue, @batch_size, @wait_for) start_metrics(batch) batch end | 
#set_batch_dimensions(batch_size, wait_for) ⇒ Object
| 52 53 54 55 | # File 'lib/logstash/util/wrapped_synchronous_queue.rb', line 52 def set_batch_dimensions(batch_size, wait_for) @batch_size = batch_size @wait_for = TimeUnit::NANOSECONDS.convert(wait_for, TimeUnit::MILLISECONDS) end | 
#set_events_metric(metric) ⇒ Object
| 57 58 59 60 61 62 63 | # File 'lib/logstash/util/wrapped_synchronous_queue.rb', line 57 def set_events_metric(metric) @event_metric = metric @event_metric_out = @event_metric.counter(:out) @event_metric_filtered = @event_metric.counter(:filtered) @event_metric_time = @event_metric.counter(:duration_in_millis) define_initial_metrics_values(@event_metric) end | 
#set_pipeline_metric(metric) ⇒ Object
| 65 66 67 68 69 70 71 | # File 'lib/logstash/util/wrapped_synchronous_queue.rb', line 65 def set_pipeline_metric(metric) @pipeline_metric = metric @pipeline_metric_out = @pipeline_metric.counter(:out) @pipeline_metric_filtered = @pipeline_metric.counter(:filtered) @pipeline_metric_time = @pipeline_metric.counter(:duration_in_millis) define_initial_metrics_values(@pipeline_metric) end | 
#start_metrics(batch) ⇒ Object
| 95 96 97 98 99 | # File 'lib/logstash/util/wrapped_synchronous_queue.rb', line 95 def start_metrics(batch) thread = Thread.current @inflight_batches[thread] = batch @inflight_clocks[thread] = java.lang.System.nano_time end |