Class: Persist::Sharder

Inherits:
Object
  • Object
show all
Defined in:
lib/rbbt/persist/tsv/sharder.rb

Constant Summary collapse

MAX_CHAR =
255.chr

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(persistence_path, write = false, db_type = nil, options = {}, &block) ⇒ Sharder

Returns a new instance of Sharder.



16
17
18
19
20
21
22
23
24
25
26
27
# File 'lib/rbbt/persist/tsv/sharder.rb', line 16

def initialize(persistence_path, write = false, db_type=nil, options = {}, &block)
  @shard_function = block
  @options = options
  @persistence_path = Path.setup(persistence_path)
  @mutex = Mutex.new
  @writable = write
  @db_type = db_type

  if write
    @databases = {} 
  end
end

Instance Attribute Details

#closedObject

Returns the value of attribute closed.



14
15
16
# File 'lib/rbbt/persist/tsv/sharder.rb', line 14

def closed
  @closed
end

#databasesObject

Returns the value of attribute databases.



14
15
16
# File 'lib/rbbt/persist/tsv/sharder.rb', line 14

def databases
  @databases
end

#db_typeObject

Returns the value of attribute db_type.



14
15
16
# File 'lib/rbbt/persist/tsv/sharder.rb', line 14

def db_type
  @db_type
end

#mutexObject

Returns the value of attribute mutex.



14
15
16
# File 'lib/rbbt/persist/tsv/sharder.rb', line 14

def mutex
  @mutex
end

#optionsObject

Returns the value of attribute options.



14
15
16
# File 'lib/rbbt/persist/tsv/sharder.rb', line 14

def options
  @options
end

#persistence_pathObject

Returns the value of attribute persistence_path.



14
15
16
# File 'lib/rbbt/persist/tsv/sharder.rb', line 14

def persistence_path
  @persistence_path
end

#shard_functionObject

Returns the value of attribute shard_function.



14
15
16
# File 'lib/rbbt/persist/tsv/sharder.rb', line 14

def shard_function
  @shard_function
end

#writableObject

Returns the value of attribute writable.



14
15
16
# File 'lib/rbbt/persist/tsv/sharder.rb', line 14

def writable
  @writable
end

Instance Method Details

#<<(p) ⇒ Object



29
30
31
# File 'lib/rbbt/persist/tsv/sharder.rb', line 29

def <<(key,value)
  self[key] = value
end

#[](key, clean = false) ⇒ Object



214
215
216
217
218
# File 'lib/rbbt/persist/tsv/sharder.rb', line 214

def [](key, clean=false)
  database = database(key)
  return nil if database.nil?
  v = database.send(:[], key)
end

#[]=(key, value, clean = false) ⇒ Object



210
211
212
# File 'lib/rbbt/persist/tsv/sharder.rb', line 210

def []=(key, value, clean = false)
  database(key).send(:[]=, key, value)
end

#closeObject



93
94
95
96
# File 'lib/rbbt/persist/tsv/sharder.rb', line 93

def close
  @closed = true
  super
end

#closed?Boolean

Returns:

  • (Boolean)


89
90
91
# File 'lib/rbbt/persist/tsv/sharder.rb', line 89

def closed?
  @closed
end

#collectObject



146
147
148
149
150
151
152
153
154
155
156
# File 'lib/rbbt/persist/tsv/sharder.rb', line 146

def collect
  res = []
  each do |key, value|
    res << if block_given?
             yield key, value
    else
      [key, value]
    end
  end
  res
end

#database(key) ⇒ Object



53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
# File 'lib/rbbt/persist/tsv/sharder.rb', line 53

def database(key)
  shard = key =~ /__tsv_/ ? "metadata" : shard_function.call(key)
  if databases.include? shard
    databases[shard]
  else
    if shard == 'metadata'
      database ||= begin
                     path = File.join(persistence_path, 'shard-' << shard.to_s)
                     (writable or File.exist?(path)) ? Persist.open_database(path, writable, :clean, "HDB", @options) : nil
                 end
    else
      database ||= begin
                   path = File.join(persistence_path, 'shard-' << shard.to_s)
                   (writable or File.exist?(path)) ? Persist.open_database(path, writable, :clean, db_type, @options) : nil
                 end
    end
    if database
      databases[shard] = database 
    else
      Log.warn "Database #{ path } missing" if
      nil
    end
  end
end

#eachObject



134
135
136
137
138
139
140
# File 'lib/rbbt/persist/tsv/sharder.rb', line 134

def each
  databases.values.each do |database|
    database.each do |k,v|
      yield k, v
    end
  end
end

#get_prefix(key) ⇒ Object



84
85
86
87
# File 'lib/rbbt/persist/tsv/sharder.rb', line 84

def get_prefix(key)
  keys = prefix(key)
  select(:key => keys)
end

#include?(key) ⇒ Boolean

Returns:

  • (Boolean)


142
143
144
# File 'lib/rbbt/persist/tsv/sharder.rb', line 142

def include?(key)
  self[key] != nil
end

#keysObject



206
207
208
# File 'lib/rbbt/persist/tsv/sharder.rb', line 206

def keys
  databases.values.collect{|d| d.keys }.flatten - TSV::ENTRY_KEYS.to_a
end

#merge!(hash) ⇒ Object



200
201
202
203
204
# File 'lib/rbbt/persist/tsv/sharder.rb', line 200

def merge!(hash)
  hash.each do |key,values|
    self[key] = values
  end
end

#prefix(key) ⇒ Object



80
81
82
# File 'lib/rbbt/persist/tsv/sharder.rb', line 80

def prefix(key)
  range(key, 1, key + MAX_CHAR, 1)
end

#range(*args) ⇒ Object



127
128
129
130
131
132
# File 'lib/rbbt/persist/tsv/sharder.rb', line 127

def range(*args)
  databases.values.inject([]) do |acc,database|
    acc.concat database.range(*args) if TokyoCabinet::BDB === database
    acc
  end
end

#read(force = false) ⇒ Object



98
99
100
101
102
103
104
105
106
# File 'lib/rbbt/persist/tsv/sharder.rb', line 98

def read(force = false)
  raise "SIOT"
  return if not write? and not closed and not force
  self.close
  databases.each{|d| d.read }
  @writable = false
  @closed = false
  self
end

#read?Boolean

Returns:

  • (Boolean)


123
124
125
# File 'lib/rbbt/persist/tsv/sharder.rb', line 123

def read?
  ! write?
end

#read_and_closeObject



188
189
190
191
192
193
194
195
196
197
198
# File 'lib/rbbt/persist/tsv/sharder.rb', line 188

def read_and_close
  @mutex.synchronize do
    read if @closed or not read?
    res = begin
            yield
          ensure
            close
          end
    res
  end
end

#sizeObject



237
238
239
240
241
242
# File 'lib/rbbt/persist/tsv/sharder.rb', line 237

def size
  databases.inject(0){|acc,i| 
    shard, db = i; 
    acc += db.size 
  }
end

#writeObject



108
109
110
111
112
113
114
115
116
117
# File 'lib/rbbt/persist/tsv/sharder.rb', line 108

def write(force = true)
  return if write? and not closed and not force
  self.close

  databases.each{|d| d.write }

  @writable = true
  @closed = false
  self
end

#write?Boolean

Returns:

  • (Boolean)


119
120
121
# File 'lib/rbbt/persist/tsv/sharder.rb', line 119

def write?
  @writable
end

#write_and_closeObject



173
174
175
176
177
178
179
180
181
182
183
184
185
186
# File 'lib/rbbt/persist/tsv/sharder.rb', line 173

def write_and_close
  lock_filename = Persist.persistence_path(File.join(persistence_path, 'write'), {:dir => TSV.lock_dir})
  Misc.lock(lock_filename) do
    @mutex.synchronize do
      write if @closed or not write?
      res = begin
              yield
            ensure
              close
            end
      res
    end
  end
end

#write_and_readObject



158
159
160
161
162
163
164
165
166
167
168
169
170
171
# File 'lib/rbbt/persist/tsv/sharder.rb', line 158

def write_and_read
  lock_filename = Persist.persistence_path(File.join(persistence_path, 'write'), {:dir => TSV.lock_dir})
  Misc.lock(lock_filename) do
    @mutex.synchronize do
      write if @closed or not write?
      res = begin
              yield
            ensure
              read
            end
      res
    end
  end
end