Class: LogStash::Outputs::S3::FileRepository
- Inherits:
-
Object
- Object
- LogStash::Outputs::S3::FileRepository
- Defined in:
- lib/logstash/outputs/s3/file_repository.rb
Defined Under Namespace
Classes: FactoryInitializer, PrefixedValue
Constant Summary collapse
- DEFAULT_STATE_SWEEPER_INTERVAL_SECS =
60- DEFAULT_STALE_TIME_SECS =
15 * 60
Instance Method Summary collapse
-
#each_factory(prefixes) {|factory| ... } ⇒ void
Yields each non-deleted file factory while the current thread has exclusive access to it.
-
#each_files {|file| ... } ⇒ void
Yields the current file of each non-deleted file factory while the current thread has exclusive access to it.
-
#get_factory(prefix_key) {|factory| ... } ⇒ void
Yields the file factory while the current thread has exclusive access to it, creating a new one if one does not exist or if the current one is being reaped by the stale watcher.
-
#get_file(prefix_key) {|file| ... } ⇒ void
Ensures that a non-deleted factory exists for the provided prefix and yields its current file while the current thread has exclusive access to it.
-
#initialize(tags, encoding, temporary_directory, stale_time = DEFAULT_STALE_TIME_SECS, sweeper_interval = DEFAULT_STATE_SWEEPER_INTERVAL_SECS) ⇒ FileRepository
constructor
A new instance of FileRepository.
- #keys ⇒ Object
- #remove_if_stale(prefix_key) ⇒ Object
- #shutdown ⇒ Object
- #size ⇒ Object
- #start_stale_sweeper ⇒ Object
- #stop_stale_sweeper ⇒ Object
Constructor Details
#initialize(tags, encoding, temporary_directory, stale_time = DEFAULT_STALE_TIME_SECS, sweeper_interval = DEFAULT_STATE_SWEEPER_INTERVAL_SECS) ⇒ FileRepository
Returns a new instance of FileRepository.
65 66 67 68 69 70 71 72 73 74 75 76 77 |
# File 'lib/logstash/outputs/s3/file_repository.rb', line 65 def initialize(, encoding, temporary_directory, stale_time = DEFAULT_STALE_TIME_SECS, sweeper_interval = DEFAULT_STATE_SWEEPER_INTERVAL_SECS) # The path need to contains the prefix so when we start # logtash after a crash we keep the remote structure @prefixed_factories = ConcurrentHashMap.new @sweeper_interval = sweeper_interval @factory_initializer = FactoryInitializer.new(, encoding, temporary_directory, stale_time) start_stale_sweeper end |
Instance Method Details
#each_factory(prefixes) {|factory| ... } ⇒ void
This method returns an undefined value.
Yields each non-deleted file factory while the current thread has exclusive access to it.
128 129 130 131 132 133 134 135 136 |
# File 'lib/logstash/outputs/s3/file_repository.rb', line 128 def each_factory(prefixes) prefixes.each do |prefix_key| prefix_val = @prefixed_factories.get(prefix_key) prefix_val&.with_lock do |factory| yield factory unless prefix_val.deleted? end end nil # void return avoid leaking unsynchronized access end |
#each_files {|file| ... } ⇒ void
This method returns an undefined value.
Yields the current file of each non-deleted file factory while the current thread has exclusive access to it.
87 88 89 90 91 92 |
# File 'lib/logstash/outputs/s3/file_repository.rb', line 87 def each_files each_factory(keys) do |factory| yield factory.current end nil # void return avoid leaking unsynchronized access end |
#get_factory(prefix_key) {|factory| ... } ⇒ void
This method returns an undefined value.
Yields the file factory while the current thread has exclusive access to it, creating a new one if one does not exist or if the current one is being reaped by the stale watcher.
100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 |
# File 'lib/logstash/outputs/s3/file_repository.rb', line 100 def get_factory(prefix_key) # fast-path: if factory exists and is not deleted, yield it with exclusive access and return prefix_val = @prefixed_factories.get(prefix_key) prefix_val&.with_lock do |factory| # intentional local-jump to ensure deletion detection # is done inside the exclusive access. unless prefix_val.deleted? yield(factory) return nil # void return avoid leaking unsynchronized access end end # slow-path: # the ConcurrentHashMap#get operation is lock-free, but may have returned an entry that was being deleted by # another thread (such as via stale detection). If we failed to retrieve a value, or retrieved one that had # been marked deleted, use the atomic ConcurrentHashMap#compute to retrieve a non-deleted entry. prefix_val = @prefixed_factories.compute(prefix_key) do |_, existing| existing && !existing.deleted? ? existing : @factory_initializer.apply(prefix_key) end prefix_val.with_lock { |factory| yield factory } nil # void return avoid leaking unsynchronized access end |
#get_file(prefix_key) {|file| ... } ⇒ void
This method returns an undefined value.
Ensures that a non-deleted factory exists for the provided prefix and yields its current file while the current thread has exclusive access to it.
144 145 146 147 |
# File 'lib/logstash/outputs/s3/file_repository.rb', line 144 def get_file(prefix_key) get_factory(prefix_key) { |factory| yield factory.current } nil # void return avoid leaking unsynchronized access end |
#keys ⇒ Object
79 80 81 |
# File 'lib/logstash/outputs/s3/file_repository.rb', line 79 def keys @prefixed_factories.keySet end |
#remove_if_stale(prefix_key) ⇒ Object
157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 |
# File 'lib/logstash/outputs/s3/file_repository.rb', line 157 def remove_if_stale(prefix_key) # we use the ATOMIC `ConcurrentHashMap#computeIfPresent` to atomically # detect the staleness, mark a stale prefixed factory as deleted, and delete from the map. @prefixed_factories.computeIfPresent(prefix_key) do |_, prefixed_factory| # once we have retrieved an instance, we acquire exclusive access to it # for stale detection, marking it as deleted before releasing the lock # and causing it to become deleted from the map. prefixed_factory.with_lock do |_| if prefixed_factory.stale? prefixed_factory.delete! # mark deleted to prevent reuse nil # cause deletion else prefixed_factory # keep existing end end end end |
#shutdown ⇒ Object
149 150 151 |
# File 'lib/logstash/outputs/s3/file_repository.rb', line 149 def shutdown stop_stale_sweeper end |
#size ⇒ Object
153 154 155 |
# File 'lib/logstash/outputs/s3/file_repository.rb', line 153 def size @prefixed_factories.size end |
#start_stale_sweeper ⇒ Object
175 176 177 178 179 180 181 182 183 184 185 |
# File 'lib/logstash/outputs/s3/file_repository.rb', line 175 def start_stale_sweeper @stale_sweeper = Concurrent::TimerTask.new(:execution_interval => @sweeper_interval) do LogStash::Util.set_thread_name("S3, Stale factory sweeper") @prefixed_factories.keys.each do |prefix| remove_if_stale(prefix) end end @stale_sweeper.execute end |
#stop_stale_sweeper ⇒ Object
187 188 189 |
# File 'lib/logstash/outputs/s3/file_repository.rb', line 187 def stop_stale_sweeper @stale_sweeper.shutdown end |