Class: Mongoriver::AbstractPersistentTailer

Inherits:
Tailer
  • Object
show all
Defined in:
lib/mongoriver/abstract_persistent_tailer.rb

Overview

A variant of Tailer that automatically loads and persists the “last position processes” state. See PersistentTailer for a concrete subclass that uses the same mongod you are already tailing.

Direct Known Subclasses

PersistentTailer

Constant Summary collapse

DEFAULT_SAVE_FREQUENCY =

How often to save position to database

60.0

Instance Attribute Summary collapse

Attributes inherited from Tailer

#database_type, #oplog, #upstream_conn

Instance Method Summary collapse

Methods inherited from Tailer

#close, #connect_upstream, #connection_config, #ensure_upstream_replset!, #latest_oplog_entry, #most_recent_position, #oplog_collection, #parse_direct_upstream, #parse_host_spec, #position, #stop, #tail_from, #tailing, #time_for

Methods included from Assertions

#assert

Methods included from Logging

#log

Constructor Details

#initialize(upstream, type, opts = {}) ⇒ AbstractPersistentTailer

Returns a new instance of AbstractPersistentTailer.



14
15
16
17
18
19
20
21
22
# File 'lib/mongoriver/abstract_persistent_tailer.rb', line 14

def initialize(upstream, type, opts={})
  raise "You can't instantiate an AbstractPersistentTailer -- did you want PersistentTailer? " if self.class == AbstractPersistentTailer
  super(upstream, type)

  @last_saved       = {}
  @batch            = opts[:batch]
  @last_read        = {}
  @save_frequency   = opts[:save_frequency] || DEFAULT_SAVE_FREQUENCY
end

Instance Attribute Details

#last_readObject (readonly)

Returns the value of attribute last_read.



9
10
11
# File 'lib/mongoriver/abstract_persistent_tailer.rb', line 9

def last_read
  @last_read
end

#last_savedObject (readonly)

Returns the value of attribute last_saved.



9
10
11
# File 'lib/mongoriver/abstract_persistent_tailer.rb', line 9

def last_saved
  @last_saved
end

Instance Method Details

#batch_doneObject



72
73
74
75
# File 'lib/mongoriver/abstract_persistent_tailer.rb', line 72

def batch_done
  raise "You must specify :batch => true to use the batch-processing interface." unless @batch
  maybe_save_state
end

#maybe_save_stateObject



117
118
119
120
121
122
# File 'lib/mongoriver/abstract_persistent_tailer.rb', line 117

def maybe_save_state
  return unless last_read['time']
  if last_saved['time'].nil? || last_read['time'] - last_saved['time'] > @save_frequency
    save_state
  end
end

#most_recent_state(before_time = nil) ⇒ Object



56
57
58
59
60
61
62
# File 'lib/mongoriver/abstract_persistent_tailer.rb', line 56

def most_recent_state(before_time=nil)
  record = latest_oplog_entry(before_time)
  {
    'time' => time_for(record),
    'position' => position(record)
  }
end

#read_positionObject

Read the most recent position from storage. Return nil if nothing was found.



94
95
96
97
# File 'lib/mongoriver/abstract_persistent_tailer.rb', line 94

def read_position
  state = read_state || {}
  return state['position']
end

#read_stateObject

Get the current state from storage. Implement this!

Parameters:

  • state (Hash)

    a customizable set of options



81
82
83
# File 'lib/mongoriver/abstract_persistent_tailer.rb', line 81

def read_state
  raise "read_state unimplemented!"
end

#read_timestampObject

Read the most recent timestamp of a read from storage. Return nil if nothing was found.



87
88
89
90
# File 'lib/mongoriver/abstract_persistent_tailer.rb', line 87

def read_timestamp
  state = read_state || {}
  return state['time']
end

#save_state(state = nil) ⇒ Object



107
108
109
110
111
112
113
114
115
# File 'lib/mongoriver/abstract_persistent_tailer.rb', line 107

def save_state(state=nil)
  if state.nil?
    state = last_read
  end
  return unless state['position']
  write_state(state)
  @last_saved = state
  log.info("Saved state: #{last_saved}")
end

#state_for(record) ⇒ Object

state to save to the database for this record



65
66
67
68
69
70
# File 'lib/mongoriver/abstract_persistent_tailer.rb', line 65

def state_for(record)
  {
    'time' => Time.at(record['ts'].seconds),
    'position' => position(record)
  }
end

#stream(limit = nil) ⇒ Object



30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
# File 'lib/mongoriver/abstract_persistent_tailer.rb', line 30

def stream(limit=nil)
  start_time = connection_config['localTime']
  found_entry = false

  # Sketchy logic - yield results from Tailer.stream
  # if nothing is found and nothing in cursor, save the current position
  entries_left = super(limit) do |entry|
    yield entry

    found_entry = true
    @last_read = state_for(entry)
    maybe_save_state unless @batch
  end

  if !found_entry && !entries_left
    @last_read['time'] = start_time
    if @last_read['position'].nil?
      @last_read['position'] ||= read_position
      @last_read['position'] ||= most_recent_position(start_time)
    end
    maybe_save_state unless @batch
  end

  return entries_left
end

#tail(opts = {}) ⇒ Object



24
25
26
27
28
# File 'lib/mongoriver/abstract_persistent_tailer.rb', line 24

def tail(opts={})
  opts[:from] ||= read_position
  log.debug("Persistent tail options: #{opts}")
  super(opts)
end

#write_state(state) ⇒ Object

Persist current state. Implement this!

Parameters:

  • state (Hash)

Options Hash (state):

  • 'position' (BSON::Timestamp, BSON::Binary)
  • 'timestamp' (Time)


103
104
105
# File 'lib/mongoriver/abstract_persistent_tailer.rb', line 103

def write_state(state)
  raise "write_state unimplemented!"
end