Class: FileWatch::SincedbCollection

Inherits:
Object
  • Object
show all
Includes:
LogStash::Util::Loggable
Defined in:
lib/filewatch/sincedb_collection.rb

Overview

this KV collection has a watched_file storage_key (an InodeStruct) as the key and a SincedbValue as the value. the SincedbValues are built by reading the sincedb file.

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(settings) ⇒ SincedbCollection

Returns a new instance of SincedbCollection.



14
15
16
17
18
19
20
21
22
23
# File 'lib/filewatch/sincedb_collection.rb', line 14

def initialize(settings)
  @settings = settings
  @sincedb_last_write = 0
  @sincedb = {}
  @serializer = SincedbRecordSerializer.new(@settings.sincedb_expiry_duration)
  @path = Pathname.new(@settings.sincedb_path)
  @write_method = LogStash::Environment.windows? || @path.chardev? || @path.blockdev? ? method(:non_atomic_write) : method(:atomic_write)
  @full_path = @path.to_path
  FileUtils.touch(@full_path)
end

Instance Attribute Details

#pathObject (readonly)

Returns the value of attribute path.



11
12
13
# File 'lib/filewatch/sincedb_collection.rb', line 11

def path
  @path
end

#serializer=(value) ⇒ Object (writeonly)

Sets the attribute serializer

Parameters:

  • value

    the value to set the attribute serializer to.



12
13
14
# File 'lib/filewatch/sincedb_collection.rb', line 12

def serializer=(value)
  @serializer = value
end

Instance Method Details

#associate(watched_file) ⇒ Object



57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
# File 'lib/filewatch/sincedb_collection.rb', line 57

def associate(watched_file)
  logger.debug("associate: finding: #{watched_file.path}")
  sincedb_value = find(watched_file)
  if sincedb_value.nil?
    # sincedb has no record of this inode
    # and due to the window handling of many files
    # this file may not be opened in this session.
    # a new value will be added when the file is opened
    return
  end
  if sincedb_value.watched_file.nil?
    # not associated
    if sincedb_value.path_in_sincedb.nil?
      # old v1 record, assume its the same file
      handle_association(sincedb_value, watched_file)
      return
    end
    if sincedb_value.path_in_sincedb == watched_file.path
      # the path on disk is the same as discovered path
      # and the inode is the same.
      handle_association(sincedb_value, watched_file)
      return
    end
    # the path on disk is different from discovered unassociated path
    # but they have the same key (inode)
    # treat as a new file, a new value will be added when the file is opened
    logger.debug("associate: matched but allocated to another - #{sincedb_value}")
    sincedb_value.clear_watched_file
    delete(watched_file.sincedb_key)
    return
  end
  if sincedb_value.watched_file.equal?(watched_file) # pointer equals
    logger.debug("associate: already associated - #{sincedb_value}, for path: #{watched_file.path}")
    return
  end
  # sincedb_value.watched_file is not the discovered watched_file but they have the same key (inode)
  # this means that the filename was changed during this session.
  # logout the history of the old sincedb_value and remove it
  # a new value will be added when the file is opened
  # TODO notify about done-ness of old sincedb_value and watched_file
  old_watched_file = sincedb_value.watched_file
  sincedb_value.clear_watched_file
  if logger.debug?
    logger.debug("associate: matched but allocated to another - #{sincedb_value}")
    logger.debug("associate: matched but allocated to another - old watched_file history - #{old_watched_file.recent_state_history.join(', ')}")
    logger.debug("associate: matched but allocated to another - DELETING value at key `#{old_watched_file.sincedb_key}`")
  end
  delete(old_watched_file.sincedb_key)
end

#clearObject



150
151
152
# File 'lib/filewatch/sincedb_collection.rb', line 150

def clear
  @sincedb.clear
end

#delete(key) ⇒ Object



121
122
123
# File 'lib/filewatch/sincedb_collection.rb', line 121

def delete(key)
  @sincedb.delete(key)
end

#find(watched_file) ⇒ Object



107
108
109
110
111
# File 'lib/filewatch/sincedb_collection.rb', line 107

def find(watched_file)
  get(watched_file.sincedb_key).tap do |obj|
    logger.debug("find for path: #{watched_file.path}, found: '#{!obj.nil?}'")
  end
end

#get(key) ⇒ Object



117
118
119
# File 'lib/filewatch/sincedb_collection.rb', line 117

def get(key)
  @sincedb[key]
end

#increment(key, amount) ⇒ Object



137
138
139
# File 'lib/filewatch/sincedb_collection.rb', line 137

def increment(key, amount)
  @sincedb[key].increment_position(amount)
end

#keysObject



154
155
156
# File 'lib/filewatch/sincedb_collection.rb', line 154

def keys
  @sincedb.keys
end

#last_read(key) ⇒ Object



125
126
127
# File 'lib/filewatch/sincedb_collection.rb', line 125

def last_read(key)
  @sincedb[key].position
end

#member?(key) ⇒ Boolean

Returns:

  • (Boolean)


113
114
115
# File 'lib/filewatch/sincedb_collection.rb', line 113

def member?(key)
  @sincedb.member?(key)
end

#openObject



39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
# File 'lib/filewatch/sincedb_collection.rb', line 39

def open
  @time_sdb_opened = Time.now.to_f
  begin
    path.open do |file|
      logger.debug("open: reading from #{path}")
      @serializer.deserialize(file) do |key, value|
        logger.debug("open: importing ... '#{key}' => '#{value}'")
        set_key_value(key, value)
      end
    end
    logger.debug("open: count of keys read: #{@sincedb.keys.size}")
  rescue => e
    #No existing sincedb to load
    logger.debug("open: error: #{path}: #{e.inspect}")
  end

end

#request_disk_flushObject



25
26
27
28
29
30
31
32
# File 'lib/filewatch/sincedb_collection.rb', line 25

def request_disk_flush
  now = Time.now.to_i
  delta = now - @sincedb_last_write
  if delta >= @settings.sincedb_write_interval
    logger.debug("writing sincedb (delta since last write = #{delta})")
    sincedb_write(now)
  end
end

#rewind(key) ⇒ Object



129
130
131
# File 'lib/filewatch/sincedb_collection.rb', line 129

def rewind(key)
  @sincedb[key].update_position(0)
end

#set(key, value) ⇒ Object



158
159
160
161
# File 'lib/filewatch/sincedb_collection.rb', line 158

def set(key, value)
  @sincedb[key] = value
  value
end

#set_watched_file(key, watched_file) ⇒ Object



141
142
143
# File 'lib/filewatch/sincedb_collection.rb', line 141

def set_watched_file(key, watched_file)
  @sincedb[key].set_watched_file(watched_file)
end

#store_last_read(key, last_read) ⇒ Object



133
134
135
# File 'lib/filewatch/sincedb_collection.rb', line 133

def store_last_read(key, last_read)
  @sincedb[key].update_position(last_read)
end

#unset_watched_file(watched_file) ⇒ Object



145
146
147
148
# File 'lib/filewatch/sincedb_collection.rb', line 145

def unset_watched_file(watched_file)
  return unless member?(watched_file.sincedb_key)
  get(watched_file.sincedb_key).unset_watched_file
end

#watched_file_unset?(key) ⇒ Boolean

Returns:

  • (Boolean)


163
164
165
166
# File 'lib/filewatch/sincedb_collection.rb', line 163

def watched_file_unset?(key)
  return false unless member?(key)
  get(key).watched_file.nil?
end

#write(reason = nil) ⇒ Object



34
35
36
37
# File 'lib/filewatch/sincedb_collection.rb', line 34

def write(reason=nil)
  logger.debug("caller requested sincedb write (#{reason})")
  sincedb_write
end