Class: LogStash::Util::WrappedSynchronousQueue::ReadClient

Inherits:
Object
  • Object
show all
Defined in:
lib/logstash/util/wrapped_synchronous_queue.rb

Instance Method Summary collapse

Constructor Details

#initialize(queue, batch_size = 125, wait_for = 250) ⇒ ReadClient

We generally only want one thread at a time able to access pop/take/poll operations from this queue. We also depend on this to be able to block consumers while we snapshot in-flight buffers



58
59
60
61
62
63
64
65
66
67
68
69
# File 'lib/logstash/util/wrapped_synchronous_queue.rb', line 58

def initialize(queue, batch_size = 125, wait_for = 250)
  @queue = queue
  @mutex = Mutex.new
  # Note that @infilght_batches as a central mechanism for tracking inflight
  # batches will fail if we have multiple read clients in the pipeline.
  @inflight_batches = {}

  # allow the worker thread to report the execution time of the filter + output
  @inflight_clocks = {}
  @batch_size = batch_size
  @wait_for = wait_for
end

Instance Method Details

#add_filtered_metrics(batch) ⇒ Object



143
144
145
146
# File 'lib/logstash/util/wrapped_synchronous_queue.rb', line 143

def add_filtered_metrics(batch)
  @event_metric.increment(:filtered, batch.filtered_size)
  @pipeline_metric.increment(:filtered, batch.filtered_size)
end

#add_output_metrics(batch) ⇒ Object



148
149
150
151
# File 'lib/logstash/util/wrapped_synchronous_queue.rb', line 148

def add_output_metrics(batch)
  @event_metric.increment(:out, batch.filtered_size)
  @pipeline_metric.increment(:out, batch.filtered_size)
end

#add_starting_metrics(batch) ⇒ Object



138
139
140
141
# File 'lib/logstash/util/wrapped_synchronous_queue.rb', line 138

def add_starting_metrics(batch)
  @event_metric.increment(:in, batch.starting_size)
  @pipeline_metric.increment(:in, batch.starting_size)
end

#closeObject



71
72
73
# File 'lib/logstash/util/wrapped_synchronous_queue.rb', line 71

def close
  # noop, compat with acked queue read client
end

#close_batch(batch) ⇒ Object



117
118
119
120
121
122
# File 'lib/logstash/util/wrapped_synchronous_queue.rb', line 117

def close_batch(batch)
  @mutex.synchronize do
    @inflight_batches.delete(Thread.current)
    stop_clock
  end
end

#current_inflight_batchObject



94
95
96
# File 'lib/logstash/util/wrapped_synchronous_queue.rb', line 94

def current_inflight_batch
  @inflight_batches.fetch(Thread.current, [])
end

#inflight_batchesObject



88
89
90
91
92
# File 'lib/logstash/util/wrapped_synchronous_queue.rb', line 88

def inflight_batches
  @mutex.synchronize do
    yield(@inflight_batches)
  end
end

#set_batch_dimensions(batch_size, wait_for) ⇒ Object



75
76
77
78
# File 'lib/logstash/util/wrapped_synchronous_queue.rb', line 75

def set_batch_dimensions(batch_size, wait_for)
  @batch_size = batch_size
  @wait_for = wait_for
end

#set_current_thread_inflight_batch(batch) ⇒ Object



113
114
115
# File 'lib/logstash/util/wrapped_synchronous_queue.rb', line 113

def set_current_thread_inflight_batch(batch)
  @inflight_batches[Thread.current] = batch
end

#set_events_metric(metric) ⇒ Object



80
81
82
# File 'lib/logstash/util/wrapped_synchronous_queue.rb', line 80

def set_events_metric(metric)
  @event_metric = metric
end

#set_pipeline_metric(metric) ⇒ Object



84
85
86
# File 'lib/logstash/util/wrapped_synchronous_queue.rb', line 84

def set_pipeline_metric(metric)
  @pipeline_metric = metric
end

#start_clockObject



124
125
126
127
128
129
# File 'lib/logstash/util/wrapped_synchronous_queue.rb', line 124

def start_clock
  @inflight_clocks[Thread.current] = [
    @event_metric.time(:duration_in_millis),
    @pipeline_metric.time(:duration_in_millis)
  ]
end

#stop_clockObject



131
132
133
134
135
136
# File 'lib/logstash/util/wrapped_synchronous_queue.rb', line 131

def stop_clock
  unless @inflight_clocks[Thread.current].nil?
    @inflight_clocks[Thread.current].each(&:stop)
    @inflight_clocks.delete(Thread.current)
  end
end

#take_batchObject



98
99
100
101
102
103
104
105
106
107
108
109
110
111
# File 'lib/logstash/util/wrapped_synchronous_queue.rb', line 98

def take_batch
  @mutex.synchronize do
    batch = ReadBatch.new(@queue, @batch_size, @wait_for)
    set_current_thread_inflight_batch(batch)

    # We dont actually have any events to work on so lets
    # not bother with recording metrics for them
    if batch.size > 0
      add_starting_metrics(batch)
      start_clock
    end
    batch
  end
end