Class: LogStash::Util::WrappedSynchronousQueue::ReadClient
- Inherits:
-
Object
- Object
- LogStash::Util::WrappedSynchronousQueue::ReadClient
- Defined in:
- lib/logstash/util/wrapped_synchronous_queue.rb
Instance Method Summary collapse
- #add_filtered_metrics(batch) ⇒ Object
- #add_output_metrics(batch) ⇒ Object
- #add_starting_metrics(batch) ⇒ Object
- #close ⇒ Object
- #close_batch(batch) ⇒ Object
- #current_inflight_batch ⇒ Object
- #inflight_batches ⇒ Object
-
#initialize(queue, batch_size = 125, wait_for = 250) ⇒ ReadClient
constructor
We generally only want one thread at a time able to access pop/take/poll operations from this queue.
- #set_batch_dimensions(batch_size, wait_for) ⇒ Object
- #set_current_thread_inflight_batch(batch) ⇒ Object
- #set_events_metric(metric) ⇒ Object
- #set_pipeline_metric(metric) ⇒ Object
- #start_clock ⇒ Object
- #stop_clock ⇒ Object
- #take_batch ⇒ Object
Constructor Details
#initialize(queue, batch_size = 125, wait_for = 250) ⇒ ReadClient
We generally only want one thread at a time able to access pop/take/poll operations from this queue. We also depend on this to be able to block consumers while we snapshot in-flight buffers
58 59 60 61 62 63 64 65 66 67 68 69 |
# File 'lib/logstash/util/wrapped_synchronous_queue.rb', line 58 def initialize(queue, batch_size = 125, wait_for = 250) @queue = queue @mutex = Mutex.new # Note that @infilght_batches as a central mechanism for tracking inflight # batches will fail if we have multiple read clients in the pipeline. @inflight_batches = {} # allow the worker thread to report the execution time of the filter + output @inflight_clocks = {} @batch_size = batch_size @wait_for = wait_for end |
Instance Method Details
#add_filtered_metrics(batch) ⇒ Object
143 144 145 146 |
# File 'lib/logstash/util/wrapped_synchronous_queue.rb', line 143 def add_filtered_metrics(batch) @event_metric.increment(:filtered, batch.filtered_size) @pipeline_metric.increment(:filtered, batch.filtered_size) end |
#add_output_metrics(batch) ⇒ Object
148 149 150 151 |
# File 'lib/logstash/util/wrapped_synchronous_queue.rb', line 148 def add_output_metrics(batch) @event_metric.increment(:out, batch.filtered_size) @pipeline_metric.increment(:out, batch.filtered_size) end |
#add_starting_metrics(batch) ⇒ Object
138 139 140 141 |
# File 'lib/logstash/util/wrapped_synchronous_queue.rb', line 138 def add_starting_metrics(batch) @event_metric.increment(:in, batch.starting_size) @pipeline_metric.increment(:in, batch.starting_size) end |
#close ⇒ Object
71 72 73 |
# File 'lib/logstash/util/wrapped_synchronous_queue.rb', line 71 def close # noop, compat with acked queue read client end |
#close_batch(batch) ⇒ Object
117 118 119 120 121 122 |
# File 'lib/logstash/util/wrapped_synchronous_queue.rb', line 117 def close_batch(batch) @mutex.synchronize do @inflight_batches.delete(Thread.current) stop_clock end end |
#current_inflight_batch ⇒ Object
94 95 96 |
# File 'lib/logstash/util/wrapped_synchronous_queue.rb', line 94 def current_inflight_batch @inflight_batches.fetch(Thread.current, []) end |
#inflight_batches ⇒ Object
88 89 90 91 92 |
# File 'lib/logstash/util/wrapped_synchronous_queue.rb', line 88 def inflight_batches @mutex.synchronize do yield(@inflight_batches) end end |
#set_batch_dimensions(batch_size, wait_for) ⇒ Object
75 76 77 78 |
# File 'lib/logstash/util/wrapped_synchronous_queue.rb', line 75 def set_batch_dimensions(batch_size, wait_for) @batch_size = batch_size @wait_for = wait_for end |
#set_current_thread_inflight_batch(batch) ⇒ Object
113 114 115 |
# File 'lib/logstash/util/wrapped_synchronous_queue.rb', line 113 def set_current_thread_inflight_batch(batch) @inflight_batches[Thread.current] = batch end |
#set_events_metric(metric) ⇒ Object
80 81 82 |
# File 'lib/logstash/util/wrapped_synchronous_queue.rb', line 80 def set_events_metric(metric) @event_metric = metric end |
#set_pipeline_metric(metric) ⇒ Object
84 85 86 |
# File 'lib/logstash/util/wrapped_synchronous_queue.rb', line 84 def set_pipeline_metric(metric) @pipeline_metric = metric end |
#start_clock ⇒ Object
124 125 126 127 128 129 |
# File 'lib/logstash/util/wrapped_synchronous_queue.rb', line 124 def start_clock @inflight_clocks[Thread.current] = [ @event_metric.time(:duration_in_millis), @pipeline_metric.time(:duration_in_millis) ] end |
#stop_clock ⇒ Object
131 132 133 134 135 136 |
# File 'lib/logstash/util/wrapped_synchronous_queue.rb', line 131 def stop_clock unless @inflight_clocks[Thread.current].nil? @inflight_clocks[Thread.current].each(&:stop) @inflight_clocks.delete(Thread.current) end end |
#take_batch ⇒ Object
98 99 100 101 102 103 104 105 106 107 108 109 110 111 |
# File 'lib/logstash/util/wrapped_synchronous_queue.rb', line 98 def take_batch @mutex.synchronize do batch = ReadBatch.new(@queue, @batch_size, @wait_for) set_current_thread_inflight_batch(batch) # We dont actually have any events to work on so lets # not bother with recording metrics for them if batch.size > 0 add_starting_metrics(batch) start_clock end batch end end |