Class: Mongoriver::AbstractPersistentTailer
- Inherits:
-
Tailer
- Object
- Tailer
- Mongoriver::AbstractPersistentTailer
show all
- Defined in:
- lib/mongoriver/abstract_persistent_tailer.rb
Overview
A variant of Tailer that automatically loads and persists the “last timestamp processes” state. See PersistentTailer for a concrete subclass that uses the same mongod you are already tailing.
Instance Attribute Summary
Attributes inherited from Tailer
#upstream_conn
Instance Method Summary
collapse
Methods inherited from Tailer
#connect_upstream, #ensure_upstream_replset!, #most_recent_timestamp, #oplog_collection, #parse_direct_upstream, #parse_host_spec, #stop
Methods included from Logging
#log
Constructor Details
Returns a new instance of AbstractPersistentTailer.
9
10
11
12
13
14
15
16
|
# File 'lib/mongoriver/abstract_persistent_tailer.rb', line 9
def initialize(upstream, type, opts={})
raise "You can't instantiate an AbstractPersistentTailer -- did you want PersistentTailer? " if self.class == AbstractPersistentTailer
super(upstream, type)
@last_saved = nil
@batch = opts[:batch]
@last_read = nil
end
|
Instance Method Details
#batch_done ⇒ Object
33
34
35
36
|
# File 'lib/mongoriver/abstract_persistent_tailer.rb', line 33
def batch_done
raise "You must specify :batch => true to use the batch-processing interface." unless @batch
maybe_save_timestamp
end
|
#maybe_save_timestamp ⇒ Object
52
53
54
55
56
|
# File 'lib/mongoriver/abstract_persistent_tailer.rb', line 52
def maybe_save_timestamp
return unless @last_read
save_timestamp if @last_saved.nil? || (@last_read.seconds - @last_saved.seconds) > 60
end
|
#read_timestamp ⇒ Object
38
39
40
|
# File 'lib/mongoriver/abstract_persistent_tailer.rb', line 38
def read_timestamp
raise "read_timestamp unimplemented!"
end
|
#save_timestamp ⇒ Object
46
47
48
49
50
|
# File 'lib/mongoriver/abstract_persistent_tailer.rb', line 46
def save_timestamp
write_timestamp(@last_read)
@last_saved = @last_read
log.info("Saved timestamp: #{@last_saved} (#{Time.at(@last_saved.seconds)})")
end
|
#stream(limit = nil) ⇒ Object
25
26
27
28
29
30
31
|
# File 'lib/mongoriver/abstract_persistent_tailer.rb', line 25
def stream(limit=nil)
super(limit) do |entry|
yield entry
@last_read = entry['ts']
maybe_save_timestamp unless @batch
end
end
|
#tail_from(ts, opts = {}) ⇒ Object
18
19
20
21
22
23
|
# File 'lib/mongoriver/abstract_persistent_tailer.rb', line 18
def tail_from(ts, opts={})
if ts.nil?
ts = read_timestamp
end
super(ts, opts)
end
|
#write_timestamp ⇒ Object
42
43
44
|
# File 'lib/mongoriver/abstract_persistent_tailer.rb', line 42
def write_timestamp
raise "save_timestamp unimplemented!"
end
|