Class: Mongoriver::AbstractPersistentTailer
- Defined in:
- lib/mongoriver/abstract_persistent_tailer.rb
Overview
A variant of Tailer that automatically loads and persists the “last position processes” state. See PersistentTailer for a concrete subclass that uses the same mongod you are already tailing.
Direct Known Subclasses
Constant Summary collapse
- DEFAULT_SAVE_FREQUENCY =
How often to save position to database
60.0
Instance Attribute Summary collapse
-
#last_read ⇒ Object
readonly
Returns the value of attribute last_read.
-
#last_saved ⇒ Object
readonly
Returns the value of attribute last_saved.
Attributes inherited from Tailer
#database_type, #oplog, #upstream_conn
Instance Method Summary collapse
- #batch_done ⇒ Object
-
#initialize(upstream, type, opts = {}) ⇒ AbstractPersistentTailer
constructor
A new instance of AbstractPersistentTailer.
- #maybe_save_state ⇒ Object
- #most_recent_state(before_time = nil) ⇒ Object
-
#read_position ⇒ Object
Read the most recent position from storage.
-
#read_state ⇒ Object
Get the current state from storage.
-
#read_timestamp ⇒ Object
Read the most recent timestamp of a read from storage.
- #save_state(state = nil) ⇒ Object
-
#state_for(record) ⇒ Object
state to save to the database for this record.
- #stream(limit = nil) ⇒ Object
- #tail(opts = {}) ⇒ Object
-
#write_state(state) ⇒ Object
Persist current state.
Methods inherited from Tailer
#close, #connect_upstream, #connection_config, #ensure_upstream_replset!, #latest_oplog_entry, #most_recent_position, #oplog_collection, #parse_direct_upstream, #parse_host_spec, #position, #stop, #tail_from, #tailing, #time_for
Methods included from Assertions
Methods included from Logging
Constructor Details
#initialize(upstream, type, opts = {}) ⇒ AbstractPersistentTailer
Returns a new instance of AbstractPersistentTailer.
14 15 16 17 18 19 20 21 22 |
# File 'lib/mongoriver/abstract_persistent_tailer.rb', line 14 def initialize(upstream, type, opts={}) raise "You can't instantiate an AbstractPersistentTailer -- did you want PersistentTailer? " if self.class == AbstractPersistentTailer super(upstream, type) @last_saved = {} @batch = opts[:batch] @last_read = {} @save_frequency = opts[:save_frequency] || DEFAULT_SAVE_FREQUENCY end |
Instance Attribute Details
#last_read ⇒ Object (readonly)
Returns the value of attribute last_read.
9 10 11 |
# File 'lib/mongoriver/abstract_persistent_tailer.rb', line 9 def last_read @last_read end |
#last_saved ⇒ Object (readonly)
Returns the value of attribute last_saved.
9 10 11 |
# File 'lib/mongoriver/abstract_persistent_tailer.rb', line 9 def last_saved @last_saved end |
Instance Method Details
#batch_done ⇒ Object
72 73 74 75 |
# File 'lib/mongoriver/abstract_persistent_tailer.rb', line 72 def batch_done raise "You must specify :batch => true to use the batch-processing interface." unless @batch maybe_save_state end |
#maybe_save_state ⇒ Object
117 118 119 120 121 122 |
# File 'lib/mongoriver/abstract_persistent_tailer.rb', line 117 def maybe_save_state return unless last_read['time'] if last_saved['time'].nil? || last_read['time'] - last_saved['time'] > @save_frequency save_state end end |
#most_recent_state(before_time = nil) ⇒ Object
56 57 58 59 60 61 62 |
# File 'lib/mongoriver/abstract_persistent_tailer.rb', line 56 def most_recent_state(before_time=nil) record = latest_oplog_entry(before_time) { 'time' => time_for(record), 'position' => position(record) } end |
#read_position ⇒ Object
Read the most recent position from storage. Return nil if nothing was found.
94 95 96 97 |
# File 'lib/mongoriver/abstract_persistent_tailer.rb', line 94 def read_position state = read_state || {} return state['position'] end |
#read_state ⇒ Object
Get the current state from storage. Implement this!
81 82 83 |
# File 'lib/mongoriver/abstract_persistent_tailer.rb', line 81 def read_state raise "read_state unimplemented!" end |
#read_timestamp ⇒ Object
Read the most recent timestamp of a read from storage. Return nil if nothing was found.
87 88 89 90 |
# File 'lib/mongoriver/abstract_persistent_tailer.rb', line 87 def state = read_state || {} return state['time'] end |
#save_state(state = nil) ⇒ Object
107 108 109 110 111 112 113 114 115 |
# File 'lib/mongoriver/abstract_persistent_tailer.rb', line 107 def save_state(state=nil) if state.nil? state = last_read end return unless state['position'] write_state(state) @last_saved = state log.info("Saved state: #{last_saved}") end |
#state_for(record) ⇒ Object
state to save to the database for this record
65 66 67 68 69 70 |
# File 'lib/mongoriver/abstract_persistent_tailer.rb', line 65 def state_for(record) { 'time' => Time.at(record['ts'].seconds), 'position' => position(record) } end |
#stream(limit = nil) ⇒ Object
30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 |
# File 'lib/mongoriver/abstract_persistent_tailer.rb', line 30 def stream(limit=nil) start_time = connection_config['localTime'] found_entry = false # Sketchy logic - yield results from Tailer.stream # if nothing is found and nothing in cursor, save the current position entries_left = super(limit) do |entry| yield entry found_entry = true @last_read = state_for(entry) maybe_save_state unless @batch end if !found_entry && !entries_left @last_read['time'] = start_time if @last_read['position'].nil? @last_read['position'] ||= read_position @last_read['position'] ||= most_recent_position(start_time) end maybe_save_state unless @batch end return entries_left end |
#tail(opts = {}) ⇒ Object
24 25 26 27 28 |
# File 'lib/mongoriver/abstract_persistent_tailer.rb', line 24 def tail(opts={}) opts[:from] ||= read_position log.debug("Persistent tail options: #{opts}") super(opts) end |
#write_state(state) ⇒ Object
Persist current state. Implement this!
103 104 105 |
# File 'lib/mongoriver/abstract_persistent_tailer.rb', line 103 def write_state(state) raise "write_state unimplemented!" end |