Class: Drip

Inherits:
Object
  • Object
show all
Includes:
DRbUndumped
Defined in:
lib/drip.rb,
lib/drip.rb,
lib/drip.rb,
lib/drip.rb,
lib/drip.rb,
lib/drip.rb,
lib/drip/version.rb

Defined Under Namespace

Modules: ArrayBsearch Classes: FakeRBTree, ImmutableDrip, Renewer, SimpleStore, SortedArray

Constant Summary collapse

INF =
1.0/0.0
VERSION =
"0.1.1"

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(dir, option = {}) ⇒ Drip

Returns a new instance of Drip.



11
12
13
14
15
16
17
18
# File 'lib/drip.rb', line 11

def initialize(dir, option={})
  @past = prepare_store(dir, option)
  @fence = (@past.head[0][0] rescue 0) || 0
  @pool = Drip::SortedArray.new([])
  @tag = RBTree.new
  @event = Rinda::TupleSpace.new(5)
  @event.write([:last, @fence])
end

Class Method Details

.key_to_time(key) ⇒ Object



181
182
183
# File 'lib/drip.rb', line 181

def self.key_to_time(key)
  Time.at(*key.divmod(1000000))
end

.time_to_key(time) ⇒ Object



173
174
175
# File 'lib/drip.rb', line 173

def self.time_to_key(time)
  time.tv_sec * 1000000 + time.tv_usec
end

Instance Method Details

#curr_head(n = 1, tag = nil) ⇒ Object



146
147
148
149
150
151
152
153
154
155
156
# File 'lib/drip.rb', line 146

def curr_head(n=1, tag=nil)
  ary = []
  key = nil
  while it = curr_older(key, tag)
    break if n <= 0
    ary.unshift(it)
    key = it[0]
    n -= 1
  end
  ary
end

#curr_newer(key, tag = nil) ⇒ Object



168
169
170
171
# File 'lib/drip.rb', line 168

def curr_newer(key, tag=nil)
  return read(key, 1, 0)[0] unless tag
  read_tag(key, tag, 1, 0)[0]
end

#curr_older(key, tag = nil) ⇒ Object



158
159
160
161
162
163
164
165
166
# File 'lib/drip.rb', line 158

def curr_older(key, tag=nil)
  key = time_to_key(Time.now) unless key

  return @pool.older(key) unless tag

  it ,= @tag.upper_bound([tag, key - 1])
  return nil unless it && it[0] == tag
  [it[1]] + fetch(it[1])
end

#curr_read(key, n = 1, at_least = 1, timeout = nil) ⇒ Object



111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
# File 'lib/drip.rb', line 111

def curr_read(key, n=1, at_least=1, timeout=nil)
  renewer = make_renewer(timeout)
  key = time_to_key(Time.now) unless key
  ary = []
  n.times do
    begin
      wait(key, renewer) if at_least > ary.size
    rescue Rinda::RequestExpiredError
      return ary
    end
    key, value, *tags = @pool.read(key)[0]
    return ary unless key
    ary << [key] + [value, *tags]
  end
  ary
end

#curr_read_tag(key, tag, n = 1, at_least = 1, timeout = nil) ⇒ Object



128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
# File 'lib/drip.rb', line 128

def curr_read_tag(key, tag, n=1, at_least=1, timeout=nil)
  renewer = make_renewer(timeout)
  key = time_to_key(Time.now) unless key
  ary = []
  n.times do
    begin
      wait_tag(key, tag, renewer) if at_least > ary.size
    rescue Rinda::RequestExpiredError
      return ary
    end
    it ,= @tag.lower_bound([tag, key + 1])
    return ary unless it && it[0] == tag
    key = it[1]
    ary << [key] + fetch(key)
  end
  ary
end

#fetch(key) ⇒ Object Also known as: []



48
49
50
51
# File 'lib/drip.rb', line 48

def fetch(key)
  return @past.fetch(key) if @fence >= key 
  @pool.fetch(key)
end

#head(n = 1, tag = nil) ⇒ Object



79
80
81
82
83
84
85
86
87
# File 'lib/drip.rb', line 79

def head(n=1, tag=nil)
  unless tag
    ary = @pool.head(n)
    return @past.head(n - ary.size) + ary
  end
  ary = curr_head(n, tag)
  return ary if ary.size == n
  @past.head(n - ary.size, tag) + ary
end

#inspectObject



10
# File 'lib/drip.rb', line 10

def inspect; to_s; end

#key_to_time(key) ⇒ Object



185
186
187
# File 'lib/drip.rb', line 185

def key_to_time(key)
  self.class.key_to_time(key)
end

#latest?(key, tag = nil) ⇒ Boolean

Returns:

  • (Boolean)


89
90
91
92
93
94
95
96
97
98
99
100
101
# File 'lib/drip.rb', line 89

def latest?(key, tag=nil)
  now = time_to_key(Time.now)
  if tag
    it ,= @tag.upper_bound([tag, now])
    if it && it[0] == tag
      return true if it[1] == key
      return false if it[1] > key
    end
  else
    return true if @pool.latest?(key)
  end
  @past.latest?(key, tag)
end

#make_renewer(timeout) ⇒ Object



54
55
56
57
58
59
60
61
62
63
# File 'lib/drip.rb', line 54

def make_renewer(timeout)
  case timeout
  when 0
    return 0
  when Numeric
    return Renewer.new(timeout)
  else
    nil
  end
end

#newer(key, tag = nil) ⇒ Object



107
108
109
# File 'lib/drip.rb', line 107

def newer(key, tag=nil)
  @past.newer(key, tag) || curr_newer(key, tag)
end

#older(key, tag = nil) ⇒ Object



103
104
105
# File 'lib/drip.rb', line 103

def older(key, tag=nil)
  curr_older(key, tag) || @past.older(key, tag)
end

#read(key, n = 1, at_least = 1, timeout = nil) ⇒ Object



65
66
67
68
69
70
# File 'lib/drip.rb', line 65

def read(key, n=1, at_least=1, timeout=nil)
  return curr_read(key, n, at_least, timeout) if key > @fence
  ary = @past.read(key, n)
  return ary if ary.size >= n
  ary + curr_read(key, n - ary.size, at_least - ary.size, timeout)
end

#read_tag(key, tag, n = 1, at_least = 1, timeout = nil) ⇒ Object



72
73
74
75
76
77
# File 'lib/drip.rb', line 72

def read_tag(key, tag, n=1, at_least=1, timeout=nil)
  return curr_read_tag(key, tag, n, at_least, timeout) if key > @fence
  ary = @past.read_tag(key, tag, n)
  return ary if ary.size >= n
  ary + curr_read_tag(key, tag, n - ary.size, at_least - ary.size, timeout)
end

#tag_next(tag) ⇒ Object



189
190
191
192
193
194
195
196
# File 'lib/drip.rb', line 189

def tag_next(tag)
  past_tag = @past.tag_next(tag)
  it ,= @tag.lower_bound([tag, INF])
  return past_tag unless it
  curr_tag = it[0]
  return curr_tag unless past_tag
  curr_tag > past_tag ? past_tag : curr_tag
end

#tag_prev(tag) ⇒ Object



198
199
200
201
202
203
204
205
# File 'lib/drip.rb', line 198

def tag_prev(tag)
  past_tag = @past.tag_prev(tag)
  it ,= @tag.upper_bound([tag, 0])
  return past_tag unless it
  curr_tag = it[0]
  return curr_tag unless past_tag
  curr_tag > past_tag ? curr_tag : past_tag
end

#time_to_key(time) ⇒ Object



177
178
179
# File 'lib/drip.rb', line 177

def time_to_key(time)
  self.class.time_to_key(time)
end

#write(obj, *tags) ⇒ Object



20
21
22
# File 'lib/drip.rb', line 20

def write(obj, *tags)
  write_after(Time.now, obj, *tags)
end

#write_after(at, *value) ⇒ Object



24
25
26
27
28
29
# File 'lib/drip.rb', line 24

def write_after(at, *value)
  make_key(at) do |key|
    value = do_write(key, value)
    @pool.push([key, @store.write(key, value)])
  end
end

#write_at(at, *value) ⇒ Object



31
32
33
34
35
36
# File 'lib/drip.rb', line 31

def write_at(at, *value)
  make_key_at(at) do |key|
    value = do_write(key, value)
    @pool.push([key, @store.write(key, value)])
  end
end

#write_if_latest(cond, *value) ⇒ Object



38
39
40
41
42
43
44
45
46
# File 'lib/drip.rb', line 38

def write_if_latest(cond, *value)
  make_key(Time.now) do |key|
    cond.each {|it|
      return nil unless latest?(it[1], it[0])
    }
    value = do_write(key, value)
    @pool.push([key, @store.write(key, value)])
  end
end