Class: LogStash::Outputs::Qingstor::FileRepository

Inherits:
Object
  • Object
show all
Defined in:
lib/logstash/outputs/qingstor/file_repository.rb

Defined Under Namespace

Classes: FactoryInitializer, PrefixedValue

Constant Summary collapse

DEFAULT_STATE_SWEEPER_INTERVAL_SECS =
60
DEFAULT_STATE_TIME_SECS =
15 * 60

Instance Method Summary collapse

Constructor Details

#initialize(tags, encoding, temporary_directory, stale_time = DEFAULT_STATE_TIME_SECS, sweeper_interval = DEFAULT_STATE_SWEEPER_INTERVAL_SECS) ⇒ FileRepository

Returns a new instance of FileRepository.



63
64
65
66
67
68
69
70
71
72
73
74
# File 'lib/logstash/outputs/qingstor/file_repository.rb', line 63

def initialize(tags, encoding, temporary_directory,
               stale_time = DEFAULT_STATE_TIME_SECS,
               sweeper_interval = DEFAULT_STATE_SWEEPER_INTERVAL_SECS)
  @prefixed_factories = ConcurrentHashMap.new
  @sweeper_interval = sweeper_interval
  @factoryinitializer = FactoryInitializer.new(tags,
                                               encoding,
                                               temporary_directory,
                                               stale_time)

  start_stale_sweeper
end

Instance Method Details

#each_filesObject



80
81
82
83
84
# File 'lib/logstash/outputs/qingstor/file_repository.rb', line 80

def each_files
  @prefixed_factories.elements.each do |prefixed_file|
    prefixed_file.with_lock { |factory| yield factory.current }
  end
end

#get_factory(prefix_key) ⇒ Object



86
87
88
89
# File 'lib/logstash/outputs/qingstor/file_repository.rb', line 86

def get_factory(prefix_key)
  @prefixed_factories.computeIfAbsent(prefix_key, @factoryinitializer)
                     .with_lock { |factory| yield factory }
end

#get_file(prefix_key) ⇒ Object



91
92
93
# File 'lib/logstash/outputs/qingstor/file_repository.rb', line 91

def get_file(prefix_key)
  get_factory(prefix_key) { |factory| yield factory.current }
end

#keysObject



76
77
78
# File 'lib/logstash/outputs/qingstor/file_repository.rb', line 76

def keys
  @prefixed_factories.keySet
end

#remove_stale(k, v) ⇒ Object



103
104
105
106
107
108
# File 'lib/logstash/outputs/qingstor/file_repository.rb', line 103

def remove_stale(k, v)
  if v.stale?
    @prefixed_factories.remove(k, v)
    v.delete!
  end
end

#shutdownObject



95
96
97
# File 'lib/logstash/outputs/qingstor/file_repository.rb', line 95

def shutdown
  stop_stale_sweeper
end

#sizeObject



99
100
101
# File 'lib/logstash/outputs/qingstor/file_repository.rb', line 99

def size
  @prefixed_factories.size
end

#start_stale_sweeperObject



110
111
112
113
114
115
116
117
118
119
# File 'lib/logstash/outputs/qingstor/file_repository.rb', line 110

def start_stale_sweeper
  @stale_sweeper = Concurrent::TimerTask.new(
    :execution_interval => @sweeper_interval
  ) do
    LogStash::Util.set_thread_name('Qingstor, stale factory sweeper')
    @prefixed_factories.forEach { |k, v| remove_stale(k, v) }
  end

  @stale_sweeper.execute
end

#stop_stale_sweeperObject



121
122
123
# File 'lib/logstash/outputs/qingstor/file_repository.rb', line 121

def stop_stale_sweeper
  @stale_sweeper.shutdown
end