Class: Mongoriver::Streambed
- Inherits:
-
Object
- Object
- Mongoriver::Streambed
show all
- Includes:
- Logging
- Defined in:
- lib/mongoriver/streambed.rb
Defined Under Namespace
Classes: AssertionFailure
Instance Attribute Summary collapse
Class Method Summary
collapse
Instance Method Summary
collapse
Methods included from Logging
#log
Constructor Details
#initialize(upstreams, type) ⇒ Streambed
Returns a new instance of Streambed.
13
14
15
16
17
18
|
# File 'lib/mongoriver/streambed.rb', line 13
def initialize(upstreams, type)
@tailer = Mongoriver::Tailer.new(upstreams, type)
@record_fetch_batch_size = 1024
@record_sync_batch_size = 256
@stats = Hash.new(0)
end
|
Instance Attribute Details
#stats ⇒ Object
Returns the value of attribute stats.
5
6
7
|
# File 'lib/mongoriver/streambed.rb', line 5
def stats
@stats
end
|
Class Method Details
.all_hooks ⇒ Object
36
37
38
39
40
41
42
43
|
# File 'lib/mongoriver/streambed.rb', line 36
def self.all_hooks
hooks = my_hooks
if superclass <= Streambed
hooks + superclass.all_hooks
else
hooks
end
end
|
.hook(name, args = [], opts = {}) ⇒ Object
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
|
# File 'lib/mongoriver/streambed.rb', line 64
def self.hook(name, args=[], opts={})
if default = opts[:default]
target = hook_name(default)
implementation = Proc.new do |*args, &blk|
send(target, *args, &blk)
end
else
implementation = Proc.new do
raise NotImplementedError.new("Override in subclass")
end
end
define_method(hook_name(name), implementation)
my_hooks << [name, args, opts]
end
|
.hook_name(name) ⇒ Object
60
61
62
|
# File 'lib/mongoriver/streambed.rb', line 60
def self.hook_name(name)
"hook_#{name}"
end
|
.my_hooks ⇒ Object
32
33
34
|
# File 'lib/mongoriver/streambed.rb', line 32
def self.my_hooks
@hooks ||= []
end
|
.validate_hooks! ⇒ Object
45
46
47
48
49
50
51
52
53
54
55
56
57
58
|
# File 'lib/mongoriver/streambed.rb', line 45
def self.validate_hooks!
errors = []
all_hooks.each do |name, args, opts|
method = self.instance_method(hook_name(name))
signature = "#{method.name}(#{args.join(', ')})"
if method.owner == Streambed && !opts[:default]
errors << "Must provide implementation of #{signature}"
end
end
raise "You need to fix the following hook errors:
#{errors.join("\n ")}" if errors.length > 0
end
|
Instance Method Details
#assert(condition, msg) ⇒ Object
9
10
11
|
# File 'lib/mongoriver/streambed.rb', line 9
def assert(condition, msg)
raise AssertionFailure.new(msg) unless condition
end
|
#run ⇒ Object
20
21
22
23
24
25
26
27
28
29
30
|
# File 'lib/mongoriver/streambed.rb', line 20
def run
self.class.validate_hooks!
unless ts = starting_optime
ts = @tailer.most_recent_timestamp
initial_sync
hook_update_optime(ts, true)
end
tail_from(ts)
end
|