Class: Mongoriver::PersistentTailer

Inherits:
AbstractPersistentTailer show all
Defined in:
lib/mongoriver/persistent_tailer.rb

Overview

A variant of AbstractPersistentTailer that automatically persists the “last timestamp processes” state into the database we are tailing.

Instance Attribute Summary

Attributes inherited from Tailer

#upstream_conn

Instance Method Summary collapse

Methods inherited from AbstractPersistentTailer

#batch_done, #maybe_save_timestamp, #save_timestamp, #stream, #tail_from

Methods inherited from Tailer

#connect_upstream, #ensure_upstream_replset!, #most_recent_timestamp, #oplog_collection, #parse_direct_upstream, #parse_host_spec, #stop, #stream, #tail_from

Methods included from Logging

#log

Constructor Details

#initialize(upstream, type, service, opts = {}) ⇒ PersistentTailer

Returns a new instance of PersistentTailer.



6
7
8
9
10
11
12
13
14
# File 'lib/mongoriver/persistent_tailer.rb', line 6

def initialize(upstream, type, service, opts={})
  raise "You can't use PersistentTailer against only a slave. How am I supposed to write state? " if type == :slave
  super(upstream, type, opts)

  db         = opts[:db] || "_mongoriver"
  collection = opts[:collection] || 'oplog-tailers'
  @service = service
  @state_collection = @upstream_conn.db(db).collection(collection)
end

Instance Method Details

#read_timestampObject



16
17
18
19
# File 'lib/mongoriver/persistent_tailer.rb', line 16

def read_timestamp
  row = @state_collection.find_one(:service => @service)
  row ? row['timestamp'] : BSON::Timestamp.new(0, 0)
end

#write_timestamp(ts) ⇒ Object



21
22
23
24
25
26
27
28
# File 'lib/mongoriver/persistent_tailer.rb', line 21

def write_timestamp(ts)
  row = @state_collection.find_one(:service => @service)
  if row
    @state_collection.update({'_id' => row['_id']}, '$set' => { 'timestamp' => ts })
  else
    @state_collection.insert('service' => @service, 'timestamp' => ts)
  end
end