Class: FileWatch::SincedbCollection

Inherits:
Object
  • Object
show all
Includes:
LogStash::Util::Loggable
Defined in:
lib/filewatch/sincedb_collection.rb

Overview

this KV collection has a watched_file storage_key (an InodeStruct) as the key and a SincedbValue as the value. the SincedbValues are built by reading the sincedb file.

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(settings) ⇒ SincedbCollection

Returns a new instance of SincedbCollection.



14
15
16
17
18
19
20
21
22
23
24
# File 'lib/filewatch/sincedb_collection.rb', line 14

def initialize(settings)
  @settings = settings
  @sincedb_last_write = 0
  @sincedb = {}
  @serializer = SincedbRecordSerializer.new(@settings.sincedb_expiry_duration)
  @path = Pathname.new(@settings.sincedb_path)
  @write_method = LogStash::Environment.windows? || @path.chardev? || @path.blockdev? ? method(:non_atomic_write) : method(:atomic_write)
  @full_path = @path.to_path
  FileUtils.touch(@full_path)
  @write_requested = false
end

Instance Attribute Details

#pathObject (readonly)

Returns the value of attribute path.



11
12
13
# File 'lib/filewatch/sincedb_collection.rb', line 11

def path
  @path
end

#serializer=(value) ⇒ Object (writeonly)

Sets the attribute serializer

Parameters:

  • value

    the value to set the attribute serializer to.



12
13
14
# File 'lib/filewatch/sincedb_collection.rb', line 12

def serializer=(value)
  @serializer = value
end

Instance Method Details

#associate(watched_file) ⇒ Object



63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
# File 'lib/filewatch/sincedb_collection.rb', line 63

def associate(watched_file)
  logger.trace? && logger.trace("associate: finding", :path => watched_file.path, :inode => watched_file.sincedb_key.inode)
  sincedb_value = find(watched_file)
  if sincedb_value.nil?
    # sincedb has no record of this inode
    # and due to the window handling of many files
    # this file may not be opened in this session.
    # a new value will be added when the file is opened
    logger.trace("associate: unmatched")
    return true
  end
  logger.trace? && logger.trace("associate: found sincedb record", :filename => watched_file.filename,
                                :sincedb_key => watched_file.sincedb_key, :sincedb_value => sincedb_value)
  if sincedb_value.watched_file.nil?
    # not associated
    if sincedb_value.path_in_sincedb.nil?
      handle_association(sincedb_value, watched_file)
      logger.trace("associate: inode matched but no path in sincedb")
      return true
    end
    if sincedb_value.path_in_sincedb == watched_file.path
      # the path on disk is the same as discovered path
      # and the inode is the same.
      handle_association(sincedb_value, watched_file)
      logger.trace("associate: inode and path matched")
      return true
    end
    # the path on disk is different from discovered unassociated path
    # but they have the same key (inode)
    # treat as a new file, a new value will be added when the file is opened
    sincedb_value.clear_watched_file
    delete(watched_file.sincedb_key)
    logger.trace("associate: matched but allocated to another")
    return true
  end
  if sincedb_value.watched_file.equal?(watched_file) # pointer equals
    logger.trace("associate: already associated")
    return true
  end
  # sincedb_value.watched_file is not this discovered watched_file but they have the same key (inode)
  # this means that the filename path was changed during this session.
  # renamed file can be discovered...
  #   before the original is detected as deleted: state is `active`
  #   after the original is detected as deleted but before it is actually deleted: state is `delayed_delete`
  #   after the original is deleted
  # are not yet in the delete phase, let this play out
  existing_watched_file = sincedb_value.watched_file
  logger.trace? && logger.trace("----------------- >> associate: the found sincedb_value has a watched_file - this is a rename",
                                :this_watched_file => watched_file.details, :existing_watched_file => existing_watched_file.details)
  watched_file.rotation_in_progress
  true
end

#clearObject



170
171
172
# File 'lib/filewatch/sincedb_collection.rb', line 170

def clear
  @sincedb.clear
end

#clear_watched_file(key) ⇒ Object



162
163
164
# File 'lib/filewatch/sincedb_collection.rb', line 162

def clear_watched_file(key)
  @sincedb[key].clear_watched_file
end

#delete(key) ⇒ Object



133
134
135
# File 'lib/filewatch/sincedb_collection.rb', line 133

def delete(key)
  @sincedb.delete(key)
end

#find(watched_file) ⇒ Object



116
117
118
# File 'lib/filewatch/sincedb_collection.rb', line 116

def find(watched_file)
  get(watched_file.sincedb_key)
end

#flush_at_intervalObject



183
184
185
186
187
188
189
190
# File 'lib/filewatch/sincedb_collection.rb', line 183

def flush_at_interval
  now = Time.now
  delta = now.to_i - @sincedb_last_write
  if delta >= @settings.sincedb_write_interval
    logger.debug("writing sincedb (delta since last write = #{delta})")
    sincedb_write(now)
  end
end

#get(key) ⇒ Object



124
125
126
# File 'lib/filewatch/sincedb_collection.rb', line 124

def get(key)
  @sincedb[key]
end

#increment(key, amount) ⇒ Object



145
146
147
# File 'lib/filewatch/sincedb_collection.rb', line 145

def increment(key, amount)
  @sincedb[key].increment_position(amount)
end

#keysObject



174
175
176
# File 'lib/filewatch/sincedb_collection.rb', line 174

def keys
  @sincedb.keys
end

#last_read(key) ⇒ Object



137
138
139
# File 'lib/filewatch/sincedb_collection.rb', line 137

def last_read(key)
  @sincedb[key].position
end

#member?(key) ⇒ Boolean

Returns:

  • (Boolean)


120
121
122
# File 'lib/filewatch/sincedb_collection.rb', line 120

def member?(key)
  @sincedb.member?(key)
end

#openObject



46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
# File 'lib/filewatch/sincedb_collection.rb', line 46

def open
  @time_sdb_opened = Time.now.to_f
  begin
    path.open do |file|
      logger.trace("open: reading from #{path}")
      @serializer.deserialize(file) do |key, value|
        logger.trace("open: importing ... '#{key}' => '#{value}'")
        set_key_value(key, value)
      end
    end
    logger.trace("open: count of keys read: #{@sincedb.keys.size}")
  rescue => e
    #No existing sincedb to load
    logger.trace("open: error:", :path => path, :exception => e.class, :message => e.message)
  end
end

#reading_completed(key) ⇒ Object



166
167
168
# File 'lib/filewatch/sincedb_collection.rb', line 166

def reading_completed(key)
  @sincedb[key].reading_completed
end

#request_disk_flushObject



30
31
32
33
# File 'lib/filewatch/sincedb_collection.rb', line 30

def request_disk_flush
  @write_requested = true
  flush_at_interval
end

#rewind(key) ⇒ Object



141
142
143
# File 'lib/filewatch/sincedb_collection.rb', line 141

def rewind(key)
  @sincedb[key].update_position(0)
end

#set(key, value) ⇒ Object



128
129
130
131
# File 'lib/filewatch/sincedb_collection.rb', line 128

def set(key, value)
  @sincedb[key] = value
  value
end

#set_watched_file(key, watched_file) ⇒ Object



149
150
151
# File 'lib/filewatch/sincedb_collection.rb', line 149

def set_watched_file(key, watched_file)
  @sincedb[key].set_watched_file(watched_file)
end

#store_last_read(key, pos) ⇒ Object



158
159
160
# File 'lib/filewatch/sincedb_collection.rb', line 158

def store_last_read(key, pos)
  @sincedb[key].update_position(pos)
end

#watched_file_deleted(watched_file) ⇒ Object



153
154
155
156
# File 'lib/filewatch/sincedb_collection.rb', line 153

def watched_file_deleted(watched_file)
  value = @sincedb[watched_file.sincedb_key]
  value.unset_watched_file if value
end

#watched_file_unset?(key) ⇒ Boolean

Returns:

  • (Boolean)


178
179
180
181
# File 'lib/filewatch/sincedb_collection.rb', line 178

def watched_file_unset?(key)
  return false unless member?(key)
  get(key).watched_file.nil?
end

#write(reason = nil) ⇒ Object



41
42
43
44
# File 'lib/filewatch/sincedb_collection.rb', line 41

def write(reason=nil)
  logger.trace("caller requested sincedb write (#{reason})")
  sincedb_write
end

#write_if_requestedObject



35
36
37
38
39
# File 'lib/filewatch/sincedb_collection.rb', line 35

def write_if_requested
  if write_requested?
    flush_at_interval
  end
end

#write_requested?Boolean

Returns:

  • (Boolean)


26
27
28
# File 'lib/filewatch/sincedb_collection.rb', line 26

def write_requested?
  @write_requested
end