Class: LogStash::Outputs::S3::FileRepository

Inherits:
Object
  • Object
show all
Defined in:
lib/logstash/outputs/s3/file_repository.rb

Defined Under Namespace

Classes: FactoryInitializer, PrefixedValue

Constant Summary collapse

DEFAULT_STATE_SWEEPER_INTERVAL_SECS =
60
DEFAULT_STALE_TIME_SECS =
15 * 60

Instance Method Summary collapse

Constructor Details

#initialize(tags, encoding, temporary_directory, stale_time = DEFAULT_STALE_TIME_SECS, sweeper_interval = DEFAULT_STATE_SWEEPER_INTERVAL_SECS) ⇒ FileRepository

Returns a new instance of FileRepository.



65
66
67
68
69
70
71
72
73
74
75
76
77
# File 'lib/logstash/outputs/s3/file_repository.rb', line 65

def initialize(tags, encoding, temporary_directory,
               stale_time = DEFAULT_STALE_TIME_SECS,
               sweeper_interval = DEFAULT_STATE_SWEEPER_INTERVAL_SECS)
  # The path need to contains the prefix so when we start
  # logtash after a crash we keep the remote structure
  @prefixed_factories =  ConcurrentHashMap.new

  @sweeper_interval = sweeper_interval

  @factory_initializer = FactoryInitializer.new(tags, encoding, temporary_directory, stale_time)

  start_stale_sweeper
end

Instance Method Details

#each_factory(prefixes) {|factory| ... } ⇒ void

This method returns an undefined value.

Yields each non-deleted file factory while the current thread has exclusive access to it.

Parameters:

  • prefixes (Array<String>)

    : the prefix keys

Yield Parameters:



128
129
130
131
132
133
134
135
136
# File 'lib/logstash/outputs/s3/file_repository.rb', line 128

def each_factory(prefixes)
  prefixes.each do |prefix_key|
    prefix_val = @prefixed_factories.get(prefix_key)
    prefix_val&.with_lock do |factory|
      yield factory unless prefix_val.deleted?
    end
  end
  nil # void return avoid leaking unsynchronized access
end

#each_files {|file| ... } ⇒ void

This method returns an undefined value.

Yields the current file of each non-deleted file factory while the current thread has exclusive access to it.

Yield Parameters:



87
88
89
90
91
92
# File 'lib/logstash/outputs/s3/file_repository.rb', line 87

def each_files
  each_factory(keys) do |factory|
    yield factory.current
  end
  nil # void return avoid leaking unsynchronized access
end

#get_factory(prefix_key) {|factory| ... } ⇒ void

This method returns an undefined value.

Yields the file factory while the current thread has exclusive access to it, creating a new one if one does not exist or if the current one is being reaped by the stale watcher.

Parameters:

  • prefix_key (String)

    : the prefix key

Yield Parameters:

  • factory (TemporaryFileFactory)

    : a temporary file factory that this thread has exclusive access to



100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
# File 'lib/logstash/outputs/s3/file_repository.rb', line 100

def get_factory(prefix_key)
  # fast-path: if factory exists and is not deleted, yield it with exclusive access and return
  prefix_val = @prefixed_factories.get(prefix_key)
  prefix_val&.with_lock do |factory|
    # intentional local-jump to ensure deletion detection
    # is done inside the exclusive access.
    unless prefix_val.deleted?
      yield(factory)
      return nil # void return avoid leaking unsynchronized access
    end
  end

  # slow-path:
  # the ConcurrentHashMap#get operation is lock-free, but may have returned an entry that was being deleted by
  # another thread (such as via stale detection). If we failed to retrieve a value, or retrieved one that had
  # been marked deleted, use the atomic ConcurrentHashMap#compute to retrieve a non-deleted entry.
  prefix_val = @prefixed_factories.compute(prefix_key) do |_, existing|
    existing && !existing.deleted? ? existing : @factory_initializer.apply(prefix_key)
  end
  prefix_val.with_lock { |factory| yield factory }
  nil # void return avoid leaking unsynchronized access
end

#get_file(prefix_key) {|file| ... } ⇒ void

This method returns an undefined value.

Ensures that a non-deleted factory exists for the provided prefix and yields its current file while the current thread has exclusive access to it.

Parameters:

  • prefix_key (String)

Yield Parameters:



144
145
146
147
# File 'lib/logstash/outputs/s3/file_repository.rb', line 144

def get_file(prefix_key)
  get_factory(prefix_key) { |factory| yield factory.current }
  nil # void return avoid leaking unsynchronized access
end

#keysObject



79
80
81
# File 'lib/logstash/outputs/s3/file_repository.rb', line 79

def keys
  @prefixed_factories.keySet
end

#remove_if_stale(prefix_key) ⇒ Object



157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
# File 'lib/logstash/outputs/s3/file_repository.rb', line 157

def remove_if_stale(prefix_key)
  # we use the ATOMIC `ConcurrentHashMap#computeIfPresent` to atomically
  # detect the staleness, mark a stale prefixed factory as deleted, and delete from the map.
  @prefixed_factories.computeIfPresent(prefix_key) do |_, prefixed_factory|
    # once we have retrieved an instance, we acquire exclusive access to it
    # for stale detection, marking it as deleted before releasing the lock
    # and causing it to become deleted from the map.
    prefixed_factory.with_lock do |_|
      if prefixed_factory.stale?
        prefixed_factory.delete!  # mark deleted to prevent reuse
        nil # cause deletion
      else
        prefixed_factory # keep existing
      end
    end
  end
end

#shutdownObject



149
150
151
# File 'lib/logstash/outputs/s3/file_repository.rb', line 149

def shutdown
  stop_stale_sweeper
end

#sizeObject



153
154
155
# File 'lib/logstash/outputs/s3/file_repository.rb', line 153

def size
  @prefixed_factories.size
end

#start_stale_sweeperObject



175
176
177
178
179
180
181
182
183
184
185
# File 'lib/logstash/outputs/s3/file_repository.rb', line 175

def start_stale_sweeper
  @stale_sweeper = Concurrent::TimerTask.new(:execution_interval => @sweeper_interval) do
    LogStash::Util.set_thread_name("S3, Stale factory sweeper")

    @prefixed_factories.keys.each do |prefix|
      remove_if_stale(prefix)
    end
  end

  @stale_sweeper.execute
end

#stop_stale_sweeperObject



187
188
189
# File 'lib/logstash/outputs/s3/file_repository.rb', line 187

def stop_stale_sweeper
  @stale_sweeper.shutdown
end