Class: Drip
- Inherits:
-
Object
show all
- Includes:
- DRbUndumped
- Defined in:
- lib/drip.rb,
lib/drip.rb,
lib/drip.rb,
lib/drip.rb,
lib/drip.rb,
lib/drip.rb,
lib/drip/version.rb
Defined Under Namespace
Modules: ArrayBsearch
Classes: FakeRBTree, ImmutableDrip, Renewer, SimpleStore, SortedArray
Constant Summary
collapse
- INF =
1.0/0.0
- VERSION =
"0.1.1"
Class Method Summary
collapse
Instance Method Summary
collapse
-
#curr_head(n = 1, tag = nil) ⇒ Object
-
#curr_newer(key, tag = nil) ⇒ Object
-
#curr_older(key, tag = nil) ⇒ Object
-
#curr_read(key, n = 1, at_least = 1, timeout = nil) ⇒ Object
-
#curr_read_tag(key, tag, n = 1, at_least = 1, timeout = nil) ⇒ Object
-
#fetch(key) ⇒ Object
(also: #[])
-
#head(n = 1, tag = nil) ⇒ Object
-
#initialize(dir, option = {}) ⇒ Drip
constructor
-
#inspect ⇒ Object
-
#key_to_time(key) ⇒ Object
-
#latest?(key, tag = nil) ⇒ Boolean
-
#make_renewer(timeout) ⇒ Object
-
#newer(key, tag = nil) ⇒ Object
-
#older(key, tag = nil) ⇒ Object
-
#read(key, n = 1, at_least = 1, timeout = nil) ⇒ Object
-
#read_tag(key, tag, n = 1, at_least = 1, timeout = nil) ⇒ Object
-
#tag_next(tag) ⇒ Object
-
#tag_prev(tag) ⇒ Object
-
#time_to_key(time) ⇒ Object
-
#write(obj, *tags) ⇒ Object
-
#write_after(at, *value) ⇒ Object
-
#write_at(at, *value) ⇒ Object
-
#write_if_latest(cond, *value) ⇒ Object
Constructor Details
#initialize(dir, option = {}) ⇒ Drip
Returns a new instance of Drip.
11
12
13
14
15
16
17
18
|
# File 'lib/drip.rb', line 11
def initialize(dir, option={})
@past = prepare_store(dir, option)
@fence = (@past.head[0][0] rescue 0) || 0
@pool = Drip::SortedArray.new([])
@tag = RBTree.new
@event = Rinda::TupleSpace.new(5)
@event.write([:last, @fence])
end
|
Class Method Details
.key_to_time(key) ⇒ Object
181
182
183
|
# File 'lib/drip.rb', line 181
def self.key_to_time(key)
Time.at(*key.divmod(1000000))
end
|
.time_to_key(time) ⇒ Object
173
174
175
|
# File 'lib/drip.rb', line 173
def self.time_to_key(time)
time.tv_sec * 1000000 + time.tv_usec
end
|
Instance Method Details
#curr_head(n = 1, tag = nil) ⇒ Object
146
147
148
149
150
151
152
153
154
155
156
|
# File 'lib/drip.rb', line 146
def curr_head(n=1, tag=nil)
ary = []
key = nil
while it = curr_older(key, tag)
break if n <= 0
ary.unshift(it)
key = it[0]
n -= 1
end
ary
end
|
#curr_newer(key, tag = nil) ⇒ Object
168
169
170
171
|
# File 'lib/drip.rb', line 168
def curr_newer(key, tag=nil)
return read(key, 1, 0)[0] unless tag
read_tag(key, tag, 1, 0)[0]
end
|
#curr_older(key, tag = nil) ⇒ Object
158
159
160
161
162
163
164
165
166
|
# File 'lib/drip.rb', line 158
def curr_older(key, tag=nil)
key = time_to_key(Time.now) unless key
return @pool.older(key) unless tag
it ,= @tag.upper_bound([tag, key - 1])
return nil unless it && it[0] == tag
[it[1]] + fetch(it[1])
end
|
#curr_read(key, n = 1, at_least = 1, timeout = nil) ⇒ Object
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
|
# File 'lib/drip.rb', line 111
def curr_read(key, n=1, at_least=1, timeout=nil)
renewer = make_renewer(timeout)
key = time_to_key(Time.now) unless key
ary = []
n.times do
begin
wait(key, renewer) if at_least > ary.size
rescue Rinda::RequestExpiredError
return ary
end
key, value, *tags = @pool.read(key)[0]
return ary unless key
ary << [key] + [value, *tags]
end
ary
end
|
#curr_read_tag(key, tag, n = 1, at_least = 1, timeout = nil) ⇒ Object
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
|
# File 'lib/drip.rb', line 128
def curr_read_tag(key, tag, n=1, at_least=1, timeout=nil)
renewer = make_renewer(timeout)
key = time_to_key(Time.now) unless key
ary = []
n.times do
begin
wait_tag(key, tag, renewer) if at_least > ary.size
rescue Rinda::RequestExpiredError
return ary
end
it ,= @tag.lower_bound([tag, key + 1])
return ary unless it && it[0] == tag
key = it[1]
ary << [key] + fetch(key)
end
ary
end
|
#fetch(key) ⇒ Object
Also known as:
[]
48
49
50
51
|
# File 'lib/drip.rb', line 48
def fetch(key)
return @past.fetch(key) if @fence >= key
@pool.fetch(key)
end
|
#head(n = 1, tag = nil) ⇒ Object
79
80
81
82
83
84
85
86
87
|
# File 'lib/drip.rb', line 79
def head(n=1, tag=nil)
unless tag
ary = @pool.head(n)
return @past.head(n - ary.size) + ary
end
ary = curr_head(n, tag)
return ary if ary.size == n
@past.head(n - ary.size, tag) + ary
end
|
#inspect ⇒ Object
10
|
# File 'lib/drip.rb', line 10
def inspect; to_s; end
|
#key_to_time(key) ⇒ Object
185
186
187
|
# File 'lib/drip.rb', line 185
def key_to_time(key)
self.class.key_to_time(key)
end
|
#latest?(key, tag = nil) ⇒ Boolean
89
90
91
92
93
94
95
96
97
98
99
100
101
|
# File 'lib/drip.rb', line 89
def latest?(key, tag=nil)
now = time_to_key(Time.now)
if tag
it ,= @tag.upper_bound([tag, now])
if it && it[0] == tag
return true if it[1] == key
return false if it[1] > key
end
else
return true if @pool.latest?(key)
end
@past.latest?(key, tag)
end
|
#make_renewer(timeout) ⇒ Object
54
55
56
57
58
59
60
61
62
63
|
# File 'lib/drip.rb', line 54
def make_renewer(timeout)
case timeout
when 0
return 0
when Numeric
return Renewer.new(timeout)
else
nil
end
end
|
#newer(key, tag = nil) ⇒ Object
107
108
109
|
# File 'lib/drip.rb', line 107
def newer(key, tag=nil)
@past.newer(key, tag) || curr_newer(key, tag)
end
|
#older(key, tag = nil) ⇒ Object
103
104
105
|
# File 'lib/drip.rb', line 103
def older(key, tag=nil)
curr_older(key, tag) || @past.older(key, tag)
end
|
#read(key, n = 1, at_least = 1, timeout = nil) ⇒ Object
65
66
67
68
69
70
|
# File 'lib/drip.rb', line 65
def read(key, n=1, at_least=1, timeout=nil)
return curr_read(key, n, at_least, timeout) if key > @fence
ary = @past.read(key, n)
return ary if ary.size >= n
ary + curr_read(key, n - ary.size, at_least - ary.size, timeout)
end
|
#read_tag(key, tag, n = 1, at_least = 1, timeout = nil) ⇒ Object
72
73
74
75
76
77
|
# File 'lib/drip.rb', line 72
def read_tag(key, tag, n=1, at_least=1, timeout=nil)
return curr_read_tag(key, tag, n, at_least, timeout) if key > @fence
ary = @past.read_tag(key, tag, n)
return ary if ary.size >= n
ary + curr_read_tag(key, tag, n - ary.size, at_least - ary.size, timeout)
end
|
#tag_next(tag) ⇒ Object
189
190
191
192
193
194
195
196
|
# File 'lib/drip.rb', line 189
def tag_next(tag)
past_tag = @past.tag_next(tag)
it ,= @tag.lower_bound([tag, INF])
return past_tag unless it
curr_tag = it[0]
return curr_tag unless past_tag
curr_tag > past_tag ? past_tag : curr_tag
end
|
#tag_prev(tag) ⇒ Object
198
199
200
201
202
203
204
205
|
# File 'lib/drip.rb', line 198
def tag_prev(tag)
past_tag = @past.tag_prev(tag)
it ,= @tag.upper_bound([tag, 0])
return past_tag unless it
curr_tag = it[0]
return curr_tag unless past_tag
curr_tag > past_tag ? curr_tag : past_tag
end
|
#time_to_key(time) ⇒ Object
177
178
179
|
# File 'lib/drip.rb', line 177
def time_to_key(time)
self.class.time_to_key(time)
end
|
#write(obj, *tags) ⇒ Object
20
21
22
|
# File 'lib/drip.rb', line 20
def write(obj, *tags)
write_after(Time.now, obj, *tags)
end
|
#write_after(at, *value) ⇒ Object
24
25
26
27
28
29
|
# File 'lib/drip.rb', line 24
def write_after(at, *value)
make_key(at) do |key|
value = do_write(key, value)
@pool.push([key, @store.write(key, value)])
end
end
|
#write_at(at, *value) ⇒ Object
31
32
33
34
35
36
|
# File 'lib/drip.rb', line 31
def write_at(at, *value)
make_key_at(at) do |key|
value = do_write(key, value)
@pool.push([key, @store.write(key, value)])
end
end
|
#write_if_latest(cond, *value) ⇒ Object
38
39
40
41
42
43
44
45
46
|
# File 'lib/drip.rb', line 38
def write_if_latest(cond, *value)
make_key(Time.now) do |key|
cond.each {|it|
return nil unless latest?(it[1], it[0])
}
value = do_write(key, value)
@pool.push([key, @store.write(key, value)])
end
end
|