Class: Mongoriver::PersistentTailer

Inherits:
AbstractPersistentTailer show all
Defined in:
lib/mongoriver/persistent_tailer.rb

Overview

A variant of AbstractPersistentTailer that automatically persists the “last timestamp processes” state into the database we are tailing.

Constant Summary

Constants inherited from AbstractPersistentTailer

AbstractPersistentTailer::DEFAULT_SAVE_FREQUENCY

Instance Attribute Summary

Attributes inherited from AbstractPersistentTailer

#last_read, #last_saved

Attributes inherited from Tailer

#database_type, #oplog, #upstream_conn

Instance Method Summary collapse

Methods inherited from AbstractPersistentTailer

#batch_done, #maybe_save_state, #most_recent_state, #read_position, #read_timestamp, #save_state, #state_for, #stream, #tail

Methods inherited from Tailer

#close, #connect_upstream, #connection_config, #ensure_upstream_replset!, #latest_oplog_entry, #most_recent_position, #oplog_collection, #parse_direct_upstream, #parse_host_spec, #position, #stop, #stream, #tail, #tail_from, #tailing, #time_for

Methods included from Assertions

#assert

Methods included from Logging

#log

Constructor Details

#initialize(upstream, type, service, opts = {}) ⇒ PersistentTailer

Returns a new instance of PersistentTailer.



6
7
8
9
10
11
12
13
14
15
16
# File 'lib/mongoriver/persistent_tailer.rb', line 6

def initialize(upstream, type, service, opts={})
  if type == :slave
    raise "You can't use PersistentTailer against only a slave. How am I supposed to write state?"
  end
  super(upstream, type, opts)

  db         = opts[:db] || "_mongoriver"
  collection = opts[:collection] || 'oplog-tailers'
  @service = service
  @state_collection = @upstream_conn.db(db).collection(collection)
end

Instance Method Details

#read_stateObject



18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
# File 'lib/mongoriver/persistent_tailer.rb', line 18

def read_state
  row = @state_collection.find_one(:service => @service)
  return nil unless row

  # Try to do seamless upgrades from old mongoriver versions
  case row['v']
  when nil
    log.warn("Old style timestamp found in database. Converting!")
    ts = Time.at(row['timestamp'].seconds)
    return {
      'position' => most_recent_position(ts),
      'time' => ts
    }
  when 1
    return row['state']
  end
end

#write_state(state) ⇒ Object



36
37
38
39
# File 'lib/mongoriver/persistent_tailer.rb', line 36

def write_state(state)
  @state_collection.update({:service => @service},
    {:service => @service, :state => state, :v => 1}, :upsert => true)
end