Class: Mongoriver::AbstractPersistentTailer

Inherits:
Tailer
  • Object
show all
Defined in:
lib/mongoriver/abstract_persistent_tailer.rb

Overview

A variant of Tailer that automatically loads and persists the “last timestamp processes” state. See PersistentTailer for a concrete subclass that uses the same mongod you are already tailing.

Direct Known Subclasses

PersistentTailer

Instance Attribute Summary

Attributes inherited from Tailer

#upstream_conn

Instance Method Summary collapse

Methods inherited from Tailer

#connect_upstream, #ensure_upstream_replset!, #most_recent_timestamp, #oplog_collection, #parse_direct_upstream, #parse_host_spec, #stop

Methods included from Logging

#log

Constructor Details

#initialize(upstream, type, opts = {}) ⇒ AbstractPersistentTailer

Returns a new instance of AbstractPersistentTailer.



9
10
11
12
13
14
15
16
# File 'lib/mongoriver/abstract_persistent_tailer.rb', line 9

def initialize(upstream, type, opts={})
  raise "You can't instantiate an AbstractPersistentTailer -- did you want PersistentTailer? " if self.class == AbstractPersistentTailer
  super(upstream, type)

  @last_saved       = nil
  @batch            = opts[:batch]
  @last_read        = nil
end

Instance Method Details

#batch_doneObject



33
34
35
36
# File 'lib/mongoriver/abstract_persistent_tailer.rb', line 33

def batch_done
  raise "You must specify :batch => true to use the batch-processing interface." unless @batch
  maybe_save_timestamp
end

#maybe_save_timestampObject



52
53
54
55
56
# File 'lib/mongoriver/abstract_persistent_tailer.rb', line 52

def maybe_save_timestamp
  # Write timestamps once a minute
  return unless @last_read
  save_timestamp if @last_saved.nil? || (@last_read.seconds - @last_saved.seconds) > 60
end

#read_timestampObject



38
39
40
# File 'lib/mongoriver/abstract_persistent_tailer.rb', line 38

def read_timestamp
  raise "read_timestamp unimplemented!"
end

#save_timestampObject



46
47
48
49
50
# File 'lib/mongoriver/abstract_persistent_tailer.rb', line 46

def save_timestamp
  write_timestamp(@last_read)
  @last_saved = @last_read
  log.info("Saved timestamp: #{@last_saved} (#{Time.at(@last_saved.seconds)})")
end

#stream(limit = nil) ⇒ Object



25
26
27
28
29
30
31
# File 'lib/mongoriver/abstract_persistent_tailer.rb', line 25

def stream(limit=nil)
  super(limit) do |entry|
    yield entry
    @last_read = entry['ts']
    maybe_save_timestamp unless @batch
  end
end

#tail_from(ts, opts = {}) ⇒ Object



18
19
20
21
22
23
# File 'lib/mongoriver/abstract_persistent_tailer.rb', line 18

def tail_from(ts, opts={})
  if ts.nil?
    ts = read_timestamp
  end
  super(ts, opts)
end

#write_timestampObject



42
43
44
# File 'lib/mongoriver/abstract_persistent_tailer.rb', line 42

def write_timestamp
  raise "save_timestamp unimplemented!"
end