Class: Bud::BudDbmTable

Inherits:
BudPersistentCollection show all
Defined in:
lib/bud/storage/dbm.rb

Overview

Persistent table implementation based on dbm.

Instance Attribute Summary

Attributes inherited from BudCollection

#accumulate_tick_deltas, #bud_instance, #cols, #invalidated, #is_source, #key_cols, #new_delta, #pending, #rescan, #scanner_cnt, #struct, #tabname, #wired_by

Instance Method Summary collapse

Methods inherited from BudPersistentCollection

#invalidate_at_tick

Methods inherited from BudCollection

#*, #<=, #add_rescan_invalidate, #argagg, #argmax, #argmin, #bootstrap, #canonicalize_col, #do_insert, #each_delta, #each_tick_delta, #each_with_index, #exists?, #flat_map, #group, #init_schema, #inspect, #inspected, #invalidate_at_tick, #keys, #merge, #non_temporal_predecessors, #notin, #null_tuple, #pending_merge, #positive_predecessors, #prep_aggpairs, #pro, #qualified_tabname, #reduce, #register_coll_expr, #rename, #schema, #sort, #tick_metrics, #to_push_elem, #uniquify_tabname, #val_cols, #values

Methods included from Enumerable

#pro

Constructor Details

#initialize(name, bud_instance, given_schema) ⇒ BudDbmTable

:nodoc: all

Raises:



6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
# File 'lib/bud/storage/dbm.rb', line 6

def initialize(name, bud_instance, given_schema)
  dbm_dir = bud_instance.options[:dbm_dir]
  raise Bud::Error, "dbm support must be enabled via 'dbm_dir'" unless dbm_dir
  if bud_instance.port.nil?
    raise Bud::Error, "use of dbm storage requires an explicit port to be specified in Bud initialization options"
  end

  unless File.exists?(dbm_dir)
    Dir.mkdir(dbm_dir)
    puts "Created directory: #{dbm_dir}" unless bud_instance.options[:quiet]
  end
  dirname = "#{dbm_dir}/bud_#{bud_instance.port}"
  unless File.exists?(dirname)
    Dir.mkdir(dirname)
    puts "Created directory: #{dirname}" unless bud_instance.options[:quiet]
  end

  super(name, bud_instance, given_schema)
  @to_delete = []
  @invalidated = true

  db_fname = "#{dirname}/#{name}.dbm"
  flags = DBM::WRCREAT
  if bud_instance.options[:dbm_truncate] == true
    flags |= DBM::NEWDB
  end
  @dbm = DBM.open(db_fname, 0666, flags)
  if @dbm.nil?
    raise Bud::Error, "failed to open dbm database '#{db_fname}': #{@dbm.errmsg}"
  end
end

Instance Method Details

#[](key) ⇒ Object



44
45
46
47
48
49
50
51
52
53
# File 'lib/bud/storage/dbm.rb', line 44

def [](key)
  check_enumerable(key)
  key_s = MessagePack.pack(key)
  val_s = @dbm[key_s]
  if val_s
    return make_tuple(key, MessagePack.unpack(val_s))
  else
    return @delta[key]
  end
end

#closeObject



117
118
119
120
# File 'lib/bud/storage/dbm.rb', line 117

def close
  @dbm.close unless @dbm.nil?
  @dbm = nil
end

#each(&block) ⇒ Object



83
84
85
86
# File 'lib/bud/storage/dbm.rb', line 83

def each(&block)
  each_from([@delta], &block)
  each_storage(&block)
end

#each_from(bufs, &block) ⇒ Object



92
93
94
95
96
97
98
99
100
101
102
103
# File 'lib/bud/storage/dbm.rb', line 92

def each_from(bufs, &block)
  bufs.each do |b|
    if b == @storage then
      each_storage(&block)
    else
      b.each_value do |v|
        tick_metrics if bud_instance.options[:metrics]
        yield v
      end
    end
  end
end

#each_raw(&block) ⇒ Object



88
89
90
# File 'lib/bud/storage/dbm.rb', line 88

def each_raw(&block)
  each_storage(&block)
end

#each_storage(&block) ⇒ Object



105
106
107
108
109
110
111
112
# File 'lib/bud/storage/dbm.rb', line 105

def each_storage(&block)
  @dbm.each do |k,v|
    k_ary = MessagePack.unpack(k)
    v_ary = MessagePack.unpack(v)
    tick_metrics if bud_instance.options[:metrics]
    yield make_tuple(k_ary, v_ary)
  end
end

#empty?Boolean

Returns:

  • (Boolean)


222
223
224
# File 'lib/bud/storage/dbm.rb', line 222

def empty?
  @dbm.empty?
end

#flushObject



114
115
# File 'lib/bud/storage/dbm.rb', line 114

def flush
end

#flush_deltasObject



159
160
161
162
163
164
165
166
167
# File 'lib/bud/storage/dbm.rb', line 159

def flush_deltas
  unless @delta.empty?
    merge_to_db(@delta)
    @tick_delta.concat(@delta.values) if accumulate_tick_deltas
    @delta.clear
  end
  merge_to_db(@new_delta)
  @new_delta = {}
end

#has_key?(k) ⇒ Boolean

Returns:

  • (Boolean)


59
60
61
62
63
64
# File 'lib/bud/storage/dbm.rb', line 59

def has_key?(k)
  check_enumerable(k)
  key_s = MessagePack.pack(k)
  return true if @dbm.has_key? key_s
  return @delta.has_key? k
end

#include?(tuple) ⇒ Boolean

Returns:

  • (Boolean)


66
67
68
69
70
# File 'lib/bud/storage/dbm.rb', line 66

def include?(tuple)
  key = get_key_vals(tuple)
  value = self[key]
  return (value == tuple)
end

#init_storageObject



38
39
40
41
42
# File 'lib/bud/storage/dbm.rb', line 38

def init_storage
  # XXX: we can't easily use the @storage infrastructure provided by
  # BudCollection; issue #33
  @storage = nil
end

#insert(tuple) ⇒ Object Also known as: <<



185
186
187
188
# File 'lib/bud/storage/dbm.rb', line 185

def insert(tuple)
  key = get_key_vals(tuple)
  merge_tuple_to_db(key, tuple)
end

#invalidate_cacheObject



217
218
# File 'lib/bud/storage/dbm.rb', line 217

def invalidate_cache
end

#lengthObject



55
56
57
# File 'lib/bud/storage/dbm.rb', line 55

def length
  @dbm.length + @delta.length
end

#make_tuple(k_ary, v_ary) ⇒ Object



72
73
74
75
76
77
78
79
80
81
# File 'lib/bud/storage/dbm.rb', line 72

def make_tuple(k_ary, v_ary)
  t = @struct.new
  @key_colnums.each_with_index do |k,i|
    t[k] = k_ary[i]
  end
  val_cols.each_with_index do |c,i|
    t[cols.index(c)] = v_ary[i]
  end
  t
end

#merge_to_db(buf) ⇒ Object



122
123
124
125
126
# File 'lib/bud/storage/dbm.rb', line 122

def merge_to_db(buf)
  buf.each do |key,tuple|
    merge_tuple_to_db(key, tuple)
  end
end

#merge_tuple_to_db(key, tuple) ⇒ Object



128
129
130
131
132
133
134
135
136
137
# File 'lib/bud/storage/dbm.rb', line 128

def merge_tuple_to_db(key, tuple)
  key_s = MessagePack.pack(key)
  if @dbm.has_key?(key_s)
    old_tuple = self[key]
    raise_pk_error(tuple, old_tuple) if tuple != old_tuple
  else
    val = val_cols.map{|c| tuple[cols.index(c)]}
    @dbm[key_s] = MessagePack.pack(val)
  end
end

#pending_delete(o) ⇒ Object



172
173
174
175
176
177
178
179
180
# File 'lib/bud/storage/dbm.rb', line 172

def pending_delete(o)
  if o.class <= Bud::PushElement
     o.wire_to(self, :delete)
  elsif o.class <= Bud::BudCollection
    o.pro.wire_to(self, :delete)
  else
    @to_delete.concat(o.map{|t| prep_tuple(t) unless t.nil?})
  end
end

#tickObject

Remove to_delete and then move pending => delta.



193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
# File 'lib/bud/storage/dbm.rb', line 193

def tick
  deleted = nil
  @to_delete.each do |tuple|
    k = get_key_vals(tuple)
    k_str = MessagePack.pack(k)
    cols_str = @dbm[k_str]
    unless cols_str.nil?
      db_cols = MessagePack.unpack(cols_str)
      delete_cols = val_cols.map{|c| tuple[cols.index(c)]}
      if db_cols == delete_cols
        deleted ||= @dbm.delete k_str
      end
    end
  end
  @to_delete = []

  @invalidated = !deleted.nil?
  unless @pending.empty?
    @delta = @pending
    @pending = {}
  end
  flush
end

#tick_deltasObject

move deltas to on-disk storage, and new_deltas to deltas



140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
# File 'lib/bud/storage/dbm.rb', line 140

def tick_deltas
  unless @delta.empty?
    merge_to_db(@delta)
    @tick_delta.concat(@delta.values) if accumulate_tick_deltas
    @delta.clear
  end
  unless @new_delta.empty?
    # We allow @new_delta to contain duplicates but eliminate them here. We
    # can't just allow duplicate delta tuples because that might cause
    # spurious infinite delta processing loops.
    @new_delta.reject! {|key, val| self[key] == val}

    @delta = @new_delta
    @new_delta = {}
  end
  return !(@delta.empty?)
end