Class: Mongoriver::PersistentTailer
- Inherits:
-
AbstractPersistentTailer
- Object
- Tailer
- AbstractPersistentTailer
- Mongoriver::PersistentTailer
- Defined in:
- lib/mongoriver/persistent_tailer.rb
Overview
A variant of AbstractPersistentTailer that automatically persists the “last timestamp processes” state into the database we are tailing.
Constant Summary
Constants inherited from AbstractPersistentTailer
AbstractPersistentTailer::DEFAULT_SAVE_FREQUENCY
Instance Attribute Summary
Attributes inherited from AbstractPersistentTailer
Attributes inherited from Tailer
#database_type, #oplog, #upstream_conn
Instance Method Summary collapse
-
#initialize(upstream, type, service, opts = {}) ⇒ PersistentTailer
constructor
A new instance of PersistentTailer.
- #read_state ⇒ Object
- #write_state(state) ⇒ Object
Methods inherited from AbstractPersistentTailer
#batch_done, #maybe_save_state, #most_recent_state, #read_position, #read_timestamp, #save_state, #state_for, #stream, #tail
Methods inherited from Tailer
#close, #connect_upstream, #connection_config, #ensure_upstream_replset!, #latest_oplog_entry, #most_recent_position, #oplog_collection, #parse_direct_upstream, #parse_host_spec, #position, #stop, #stream, #tail, #tail_from, #tailing, #time_for
Methods included from Assertions
Methods included from Logging
Constructor Details
#initialize(upstream, type, service, opts = {}) ⇒ PersistentTailer
Returns a new instance of PersistentTailer.
6 7 8 9 10 11 12 13 14 15 16 |
# File 'lib/mongoriver/persistent_tailer.rb', line 6 def initialize(upstream, type, service, opts={}) if type == :slave raise "You can't use PersistentTailer against only a slave. How am I supposed to write state?" end super(upstream, type, opts) db = opts[:db] || "_mongoriver" collection = opts[:collection] || 'oplog-tailers' @service = service @state_collection = @upstream_conn.db(db).collection(collection) end |
Instance Method Details
#read_state ⇒ Object
18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 |
# File 'lib/mongoriver/persistent_tailer.rb', line 18 def read_state row = @state_collection.find_one(:service => @service) return nil unless row # Try to do seamless upgrades from old mongoriver versions case row['v'] when nil log.warn("Old style timestamp found in database. Converting!") ts = Time.at(row['timestamp'].seconds) return { 'position' => most_recent_position(ts), 'time' => ts } when 1 return row['state'] end end |
#write_state(state) ⇒ Object
36 37 38 39 |
# File 'lib/mongoriver/persistent_tailer.rb', line 36 def write_state(state) @state_collection.update({:service => @service}, {:service => @service, :state => state, :v => 1}, :upsert => true) end |