Class: Conveyor::BaseChannel
- Inherits:
-
Object
- Object
- Conveyor::BaseChannel
- Defined in:
- lib/conveyor/base_channel.rb
Overview
BaseChannel
Base implementation for channels. Not useful to instantiate directly.
Direct Known Subclasses
Defined Under Namespace
Modules: Flags
Constant Summary collapse
- NAME_PATTERN =
%r{\A[a-zA-Z\-0-9\_]+\Z}
- BUCKET_SIZE =
100_000
- FORMAT_VERSION =
1
- BLOCK_SIZE =
1000
- CACHE_SIZE =
100
Class Method Summary collapse
Instance Method Summary collapse
- #commit(data, time = nil) ⇒ Object
- #delete! ⇒ Object
- #get(id, stream = false) ⇒ Object
- #get_nearest_after_timestamp(timestamp, stream = false) ⇒ Object
-
#initialize(directory) ⇒ BaseChannel
constructor
A new instance of BaseChannel.
- #inspect ⇒ Object
- #rebuild_index ⇒ Object
- #status ⇒ Object
Constructor Details
#initialize(directory) ⇒ BaseChannel
Returns a new instance of BaseChannel.
41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 |
# File 'lib/conveyor/base_channel.rb', line 41 def initialize directory @directory = directory @data_files = [] @file_mutexes = [] @iterator = 1 #TODO: move to Channel.rb @id_lock = Mutex.new @index_file_lock = Mutex.new @block_cache = {} @block_last_used = {} if File.exists?(@directory) if !File.directory?(@directory) raise "#{@directory} is not a directory" end load_channel else Dir.mkdir(@directory) setup_channel end @index_file.sync = true end |
Class Method Details
.parse_headers(str, index_file = false) ⇒ Object
24 25 26 27 28 29 30 31 32 33 34 35 |
# File 'lib/conveyor/base_channel.rb', line 24 def self.parse_headers str, index_file=false id, time, offset, length, hash, flags, file = str.split ' ' { :id => id.to_i(36), :time => time.to_i(36), :offset => offset.to_i(36), :length => length.to_i(36), :hash => hash, :flags => flags.to_i(36), :file => (index_file ? file.to_i(36) : nil) } end |
.valid_channel_name?(name) ⇒ Boolean
37 38 39 |
# File 'lib/conveyor/base_channel.rb', line 37 def self.valid_channel_name? name !!name.match(NAME_PATTERN) end |
Instance Method Details
#commit(data, time = nil) ⇒ Object
68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 |
# File 'lib/conveyor/base_channel.rb', line 68 def commit data, time=nil l = nil gzip = data.length >= 256 if gzip compressed_data = StringIO.new g = Zlib::GzipWriter.new(compressed_data) g << data g.finish compressed_data.rewind compressed_data = compressed_data.read l = compressed_data.length else l = data.length end h = Digest::MD5.hexdigest(data) id_lock do i = @last_id + 1 t = time || Time.now.to_i b = pick_bucket(i) flags = 0 flags = flags | Flags::GZIP if gzip header, o = nil bucket_file(b) do |f| f.seek(0, IO::SEEK_END) o = f.pos header = "#{i.to_s(36)} #{t.to_i.to_s(36)} #{o.to_s(36)} #{l.to_s(36)} #{h} #{flags.to_s(36)}" f.write("#{header}\n") f.write((gzip ? compressed_data : data)) f.write("\n") end @last_id = i index_offset = nil index_file_lock do @index_file.seek(0, IO::SEEK_END) index_offset = @index_file.pos @index_file.write "#{header} #{b.to_s(36)}\n" end block_num = block_num(i) if !@blocks[block_num] @blocks << {:offset => index_offset} end if @block_cache.key?(block_num) @block_cache[block_num] << {:id => i, :time => t, :offset => o, :length => l, :hash => h, :file => b} end i end end |
#delete! ⇒ Object
148 149 150 151 152 153 |
# File 'lib/conveyor/base_channel.rb', line 148 def delete! FileUtils.rm_r(@directory) @data_files =[] @last_id = 0 @blocks = [] end |
#get(id, stream = false) ⇒ Object
119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 |
# File 'lib/conveyor/base_channel.rb', line 119 def get id, stream=false return nil unless id <= @last_id && id > 0 index_entry = search_index(id) headers, content, compressed_content, g = nil bucket_file(index_entry[:file]) do |f| f.seek index_entry[:offset] headers = self.class.parse_headers(f.readline.strip) compressed_content = f.read(index_entry[:length]) end io = StringIO.new(compressed_content) if (headers[:flags] & Flags::GZIP) != 0 g = Zlib::GzipReader.new(io) else g = io end if stream [headers, g] else [headers, g.read] end end |
#get_nearest_after_timestamp(timestamp, stream = false) ⇒ Object
143 144 145 146 |
# File 'lib/conveyor/base_channel.rb', line 143 def , stream=false i = nearest_after() get(i) if i end |
#inspect ⇒ Object
64 65 66 |
# File 'lib/conveyor/base_channel.rb', line 64 def inspect "<#{self.class} dir:'#{@directory.to_s}' last_id:#{@last_id} iterator:#{@iterator}>" end |
#rebuild_index ⇒ Object
155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 |
# File 'lib/conveyor/base_channel.rb', line 155 def rebuild_index files = Dir.glob(@directory + '/' + '[0-9]*') files = files.map{|f| [f, f.split('/').last.to_i]} files.sort!{|a,b| a[1] <=> b[1]} files.each do |(f, b)| File.open(f, 'r') do |file| puts "reading #{f}" while line = file.gets headers = self.class.parse_headers(line.strip) content = file.read(headers[:length]) file.read(1) index_offset = nil header = "#{headers[:id].to_s(36)} #{headers[:time].to_s(36)} #{headers[:offset].to_s(36)} #{headers[:length].to_s(36)} #{headers[:hash]} #{headers[:flags].to_s(36)}" index_file_lock do @index_file.seek(0, IO::SEEK_END) index_offset = @index_file.pos @index_file.write "#{header} #{b.to_s(36)}\n" end end end end end |
#status ⇒ Object
178 179 180 181 182 183 184 185 186 187 |
# File 'lib/conveyor/base_channel.rb', line 178 def status { :directory => @directory, :data_files => @data_files.collect{|f| {:path => f.path, :bytes => File.size(f.path)}}, :iterator => @iterator, :block_cache_keys => @block_cache.keys, :last_id => @last_id, :blocks => @blocks.length, } end |