Class: Mongoriver::PersistentTailer
- Inherits:
-
AbstractPersistentTailer
- Object
- Tailer
- AbstractPersistentTailer
- Mongoriver::PersistentTailer
- Defined in:
- lib/mongoriver/persistent_tailer.rb
Overview
A variant of AbstractPersistentTailer that automatically persists the “last timestamp processes” state into the database we are tailing.
Instance Attribute Summary
Attributes inherited from Tailer
Instance Method Summary collapse
-
#initialize(upstream, type, service, opts = {}) ⇒ PersistentTailer
constructor
A new instance of PersistentTailer.
- #read_timestamp ⇒ Object
- #write_timestamp(ts) ⇒ Object
Methods inherited from AbstractPersistentTailer
#batch_done, #maybe_save_timestamp, #save_timestamp, #stream, #tail_from
Methods inherited from Tailer
#connect_upstream, #ensure_upstream_replset!, #most_recent_timestamp, #oplog_collection, #parse_direct_upstream, #parse_host_spec, #stop, #stream, #tail_from
Methods included from Logging
Constructor Details
#initialize(upstream, type, service, opts = {}) ⇒ PersistentTailer
Returns a new instance of PersistentTailer.
6 7 8 9 10 11 12 13 14 |
# File 'lib/mongoriver/persistent_tailer.rb', line 6 def initialize(upstream, type, service, opts={}) raise "You can't use PersistentTailer against only a slave. How am I supposed to write state? " if type == :slave super(upstream, type, opts) db = opts[:db] || "_mongoriver" collection = opts[:collection] || 'oplog-tailers' @service = service @state_collection = @upstream_conn.db(db).collection(collection) end |
Instance Method Details
#read_timestamp ⇒ Object
16 17 18 19 |
# File 'lib/mongoriver/persistent_tailer.rb', line 16 def row = @state_collection.find_one(:service => @service) row ? row['timestamp'] : BSON::Timestamp.new(0, 0) end |
#write_timestamp(ts) ⇒ Object
21 22 23 24 25 26 27 28 |
# File 'lib/mongoriver/persistent_tailer.rb', line 21 def (ts) row = @state_collection.find_one(:service => @service) if row @state_collection.update({'_id' => row['_id']}, '$set' => { 'timestamp' => ts }) else @state_collection.insert('service' => @service, 'timestamp' => ts) end end |