Class: Mongoriver::Tailer

Inherits:
Object
  • Object
show all
Includes:
Logging
Defined in:
lib/mongoriver/tailer.rb

Direct Known Subclasses

AbstractPersistentTailer

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from Logging

#log

Constructor Details

#initialize(upstreams, type) ⇒ Tailer

Returns a new instance of Tailer.



7
8
9
10
11
12
13
14
15
16
# File 'lib/mongoriver/tailer.rb', line 7

def initialize(upstreams, type)
  @upstreams = upstreams
  @type = type
  # This number seems high
  @conn_opts = {:op_timeout => 86400}

  @cursor = nil

  connect_upstream
end

Instance Attribute Details

#upstream_connObject (readonly)

Returns the value of attribute upstream_conn.



5
6
7
# File 'lib/mongoriver/tailer.rb', line 5

def upstream_conn
  @upstream_conn
end

Instance Method Details

#connect_upstreamObject



23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
# File 'lib/mongoriver/tailer.rb', line 23

def connect_upstream
  case @type
  when :replset
    opts = @conn_opts.merge(:read => :secondary)
    @upstream_conn = Mongo::ReplSetConnection.new(@upstreams, opts)
  when :slave, :direct
    opts = @conn_opts.merge(:slave_ok => true)
    host, port = parse_direct_upstream
    @upstream_conn = Mongo::Connection.new(host, port, opts)
    raise "Server at #{@upstream_conn.host}:#{@upstream_conn.port} is the primary -- if you're ok with that, check why your wrapper is passing :direct rather than :slave (HINT: try passing a -a to scripts like optail or mongocp)" if @type == :slave && @upstream_conn.primary?
    ensure_upstream_replset!
  when :existing
    raise "Must pass in a single existing Mongo::Connection with :existing" unless @upstreams.length == 1 && @upstreams[0].respond_to?(:db)
    @upstream_conn = @upstreams[0]
  else
    raise "Invalid connection type: #{@type.inspect}"
  end
end

#ensure_upstream_replset!Object



42
43
44
45
46
47
48
# File 'lib/mongoriver/tailer.rb', line 42

def ensure_upstream_replset!
  # Might be a better way to do this, but not seeing one.
  config = @upstream_conn['admin'].command(:ismaster => 1)
  unless config['setName']
    raise "Server at #{@upstream_conn.host}:#{@upstream_conn.port} is not running as a replica set"
  end
end

#most_recent_timestampObject



18
19
20
21
# File 'lib/mongoriver/tailer.rb', line 18

def most_recent_timestamp
  record = oplog_collection.find_one({}, :sort => [['$natural', -1]])
  record['ts']
end

#oplog_collectionObject



63
64
65
# File 'lib/mongoriver/tailer.rb', line 63

def oplog_collection
  @upstream_conn.db('local').collection('oplog.rs')
end

#parse_direct_upstreamObject



50
51
52
53
54
# File 'lib/mongoriver/tailer.rb', line 50

def parse_direct_upstream
  raise "When connecting directly to a mongo instance, must provide a single upstream" unless @upstreams.length == 1
  upstream = @upstreams[0]
  parse_host_spec(upstream)
end

#parse_host_spec(host_spec) ⇒ Object



56
57
58
59
60
61
# File 'lib/mongoriver/tailer.rb', line 56

def parse_host_spec(host_spec)
  host, port = host_spec.split(':')
  host = '127.0.0.1' if host.to_s.length == 0
  port = '27017' if port.to_s.length == 0
  [host, port.to_i]
end

#stopObject



84
85
86
87
# File 'lib/mongoriver/tailer.rb', line 84

def stop
  @cursor.close if @cursor
  @cursor = nil
end

#stream(limit = nil) ⇒ Object



89
90
91
92
93
94
95
96
97
98
# File 'lib/mongoriver/tailer.rb', line 89

def stream(limit=nil)
  count = 0
  while @cursor.has_next?
    count += 1
    break if limit && count >= limit
    yield @cursor.next
  end

  return @cursor.has_next?
end

#tail_from(ts, opts = {}) ⇒ Object



67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
# File 'lib/mongoriver/tailer.rb', line 67

def tail_from(ts, opts = {})
  raise "Already tailing the oplog!" if @cursor

  # Maybe if ts is old enough, just start from the beginning?
  query = (opts[:filter] || {}).merge({ 'ts' => { '$gte' => ts } })

  oplog_collection.find(query, :timeout => false) do |oplog|
    oplog.add_option(Mongo::Constants::OP_QUERY_TAILABLE)
    oplog.add_option(Mongo::Constants::OP_QUERY_OPLOG_REPLAY)

    oplog.add_option(Mongo::Constants::OP_QUERY_AWAIT_DATA) unless opts[:dont_wait]

    log.info("Starting oplog stream from #{ts}")
    @cursor = oplog
  end
end