Class: LogStash::Util::WrappedAckedQueue::ReadClient
- Inherits:
-
Object
- Object
- LogStash::Util::WrappedAckedQueue::ReadClient
- Defined in:
- lib/logstash/util/wrapped_acked_queue.rb
Instance Method Summary collapse
- #add_filtered_metrics(filtered_size) ⇒ Object
- #add_output_metrics(filtered_size) ⇒ Object
- #close ⇒ Object
- #close_batch(batch) ⇒ Object
- #define_initial_metrics_values(namespaced_metric) ⇒ Object
- #empty? ⇒ Boolean
- #inflight_batches ⇒ Object
-
#initialize(queue, batch_size = 125, wait_for = 50) ⇒ ReadClient
constructor
We generally only want one thread at a time able to access pop/take/poll operations from this queue.
-
#new_batch ⇒ ReadBatch
create a new empty batch.
- #read_batch ⇒ Object
- #set_batch_dimensions(batch_size, wait_for) ⇒ Object
- #set_events_metric(metric) ⇒ Object
- #set_pipeline_metric(metric) ⇒ Object
- #start_metrics(batch) ⇒ Object
Constructor Details
#initialize(queue, batch_size = 125, wait_for = 50) ⇒ ReadClient
We generally only want one thread at a time able to access pop/take/poll operations from this queue. We also depend on this to be able to block consumers while we snapshot in-flight buffers
97 98 99 100 101 102 103 104 105 106 107 |
# File 'lib/logstash/util/wrapped_acked_queue.rb', line 97 def initialize(queue, batch_size = 125, wait_for = 50) @queue = queue @mutex = Mutex.new # Note that @inflight_batches as a central mechanism for tracking inflight # batches will fail if we have multiple read clients in the pipeline. @inflight_batches = {} # allow the worker thread to report the execution time of the filter + output @inflight_clocks = Concurrent::Map.new @batch_size = batch_size @wait_for = wait_for end |
Instance Method Details
#add_filtered_metrics(filtered_size) ⇒ Object
202 203 204 205 |
# File 'lib/logstash/util/wrapped_acked_queue.rb', line 202 def add_filtered_metrics(filtered_size) @event_metric.increment(:filtered, filtered_size) @pipeline_metric.increment(:filtered, filtered_size) end |
#add_output_metrics(filtered_size) ⇒ Object
207 208 209 210 |
# File 'lib/logstash/util/wrapped_acked_queue.rb', line 207 def add_output_metrics(filtered_size) @event_metric.increment(:out, filtered_size) @pipeline_metric.increment(:out, filtered_size) end |
#close ⇒ Object
109 110 111 |
# File 'lib/logstash/util/wrapped_acked_queue.rb', line 109 def close @queue.close end |
#close_batch(batch) ⇒ Object
180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 |
# File 'lib/logstash/util/wrapped_acked_queue.rb', line 180 def close_batch(batch) thread = Thread.current @mutex.lock begin batch.close @inflight_batches.delete(thread) ensure @mutex.unlock end start_time = @inflight_clocks.get_and_set(thread, nil) unless start_time.nil? if batch.size > 0 # only stop (which also records) the metrics if the batch is non-empty. # start_clock is now called at empty batch creation and an empty batch could # stay empty all the way down to the close_batch call. time_taken = (java.lang.System.nano_time - start_time) / 1_000_000 @event_metric.report_time(:duration_in_millis, time_taken) @pipeline_metric.report_time(:duration_in_millis, time_taken) end end end |
#define_initial_metrics_values(namespaced_metric) ⇒ Object
137 138 139 140 141 |
# File 'lib/logstash/util/wrapped_acked_queue.rb', line 137 def define_initial_metrics_values(namespaced_metric) namespaced_metric.report_time(:duration_in_millis, 0) namespaced_metric.increment(:filtered, 0) namespaced_metric.increment(:out, 0) end |
#empty? ⇒ Boolean
113 114 115 116 117 118 119 120 |
# File 'lib/logstash/util/wrapped_acked_queue.rb', line 113 def empty? @mutex.lock begin @queue.is_empty? ensure @mutex.unlock end end |
#inflight_batches ⇒ Object
143 144 145 146 147 148 149 150 |
# File 'lib/logstash/util/wrapped_acked_queue.rb', line 143 def inflight_batches @mutex.lock begin yield(@inflight_batches) ensure @mutex.unlock end end |
#new_batch ⇒ ReadBatch
create a new empty batch
154 155 156 |
# File 'lib/logstash/util/wrapped_acked_queue.rb', line 154 def new_batch ReadBatch.new(@queue, @batch_size, @wait_for) end |
#read_batch ⇒ Object
158 159 160 161 162 163 164 165 166 167 |
# File 'lib/logstash/util/wrapped_acked_queue.rb', line 158 def read_batch if @queue.closed? raise QueueClosedError.new("Attempt to take a batch from a closed AckedQueue") end batch = new_batch batch.read_next start_metrics(batch) batch end |
#set_batch_dimensions(batch_size, wait_for) ⇒ Object
122 123 124 125 |
# File 'lib/logstash/util/wrapped_acked_queue.rb', line 122 def set_batch_dimensions(batch_size, wait_for) @batch_size = batch_size @wait_for = wait_for end |
#set_events_metric(metric) ⇒ Object
127 128 129 130 |
# File 'lib/logstash/util/wrapped_acked_queue.rb', line 127 def set_events_metric(metric) @event_metric = metric define_initial_metrics_values(@event_metric) end |
#set_pipeline_metric(metric) ⇒ Object
132 133 134 135 |
# File 'lib/logstash/util/wrapped_acked_queue.rb', line 132 def set_pipeline_metric(metric) @pipeline_metric = metric define_initial_metrics_values(@pipeline_metric) end |
#start_metrics(batch) ⇒ Object
169 170 171 172 173 174 175 176 177 178 |
# File 'lib/logstash/util/wrapped_acked_queue.rb', line 169 def start_metrics(batch) thread = Thread.current @mutex.lock begin @inflight_batches[thread] = batch ensure @mutex.unlock end @inflight_clocks[thread] = java.lang.System.nano_time end |