Class: Mongoriver::AbstractPersistentTailer

Inherits:
Tailer
  • Object
show all
Defined in:
lib/mongoriver/abstract_persistent_tailer.rb

Overview

A variant of Tailer that automatically loads and persists the “last timestamp processes” state. See PersistentTailer for a concrete subclass that uses the same mongod you are already tailing.

Direct Known Subclasses

PersistentTailer

Instance Attribute Summary

Attributes inherited from Tailer

#oplog, #upstream_conn

Instance Method Summary collapse

Methods inherited from Tailer

#close, #connect_upstream, #connection_config, #ensure_upstream_replset!, #most_recent_timestamp, #oplog_collection, #parse_direct_upstream, #parse_host_spec, #stop, #tail_from

Methods included from Logging

#log

Constructor Details

#initialize(upstream, type, opts = {}) ⇒ AbstractPersistentTailer

Returns a new instance of AbstractPersistentTailer.



9
10
11
12
13
14
15
16
# File 'lib/mongoriver/abstract_persistent_tailer.rb', line 9

def initialize(upstream, type, opts={})
  raise "You can't instantiate an AbstractPersistentTailer -- did you want PersistentTailer? " if self.class == AbstractPersistentTailer
  super(upstream, type)

  @last_saved       = nil
  @batch            = opts[:batch]
  @last_read        = nil
end

Instance Method Details

#batch_doneObject



42
43
44
45
# File 'lib/mongoriver/abstract_persistent_tailer.rb', line 42

def batch_done
  raise "You must specify :batch => true to use the batch-processing interface." unless @batch
  maybe_save_timestamp
end

#maybe_save_timestampObject



61
62
63
64
65
# File 'lib/mongoriver/abstract_persistent_tailer.rb', line 61

def maybe_save_timestamp
  # Write timestamps once a minute
  return unless @last_read
  save_timestamp if @last_saved.nil? || (@last_read.seconds - @last_saved.seconds) > 60
end

#read_timestampObject



47
48
49
# File 'lib/mongoriver/abstract_persistent_tailer.rb', line 47

def read_timestamp
  raise "read_timestamp unimplemented!"
end

#save_timestampObject



55
56
57
58
59
# File 'lib/mongoriver/abstract_persistent_tailer.rb', line 55

def save_timestamp
  write_timestamp(@last_read)
  @last_saved = @last_read
  log.info("Saved timestamp: #{@last_saved} (#{Time.at(@last_saved.seconds)})")
end

#stream(limit = nil) ⇒ Object



23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
# File 'lib/mongoriver/abstract_persistent_tailer.rb', line 23

def stream(limit=nil)
  start_time = BSON::Timestamp.new(connection_config['localTime'].to_i, 0)
  found_entry = false

  ret = super(limit) do |entry|
    yield entry
    found_entry = true
    @last_read = entry['ts']
    maybe_save_timestamp unless @batch
  end

  if !found_entry && !ret
    @last_read = start_time
    maybe_save_timestamp unless @batch
  end

  return ret
end

#tail(opts = {}) ⇒ Object



18
19
20
21
# File 'lib/mongoriver/abstract_persistent_tailer.rb', line 18

def tail(opts={})
  opts[:from] ||= read_timestamp
  super(opts)
end

#write_timestampObject



51
52
53
# File 'lib/mongoriver/abstract_persistent_tailer.rb', line 51

def write_timestamp
  raise "save_timestamp unimplemented!"
end