Class: Mongoriver::Tailer

Inherits:
Object
  • Object
show all
Includes:
Logging
Defined in:
lib/mongoriver/tailer.rb

Direct Known Subclasses

AbstractPersistentTailer

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from Logging

#log

Constructor Details

#initialize(upstreams, type, oplog = "oplog.rs") ⇒ Tailer

Returns a new instance of Tailer.



8
9
10
11
12
13
14
15
16
17
18
19
# File 'lib/mongoriver/tailer.rb', line 8

def initialize(upstreams, type, oplog = "oplog.rs")
  @upstreams = upstreams
  @type = type
  @oplog = oplog
  # This number seems high
  @conn_opts = {:op_timeout => 86400}

  @cursor = nil
  @stop = false

  connect_upstream
end

Instance Attribute Details

#oplogObject (readonly)

Returns the value of attribute oplog.



6
7
8
# File 'lib/mongoriver/tailer.rb', line 6

def oplog
  @oplog
end

#upstream_connObject (readonly)

Returns the value of attribute upstream_conn.



5
6
7
# File 'lib/mongoriver/tailer.rb', line 5

def upstream_conn
  @upstream_conn
end

Instance Method Details

#closeObject



117
118
119
120
121
# File 'lib/mongoriver/tailer.rb', line 117

def close
  @cursor.close if @cursor
  @cursor = nil
  @stop = false
end

#connect_upstreamObject



26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
# File 'lib/mongoriver/tailer.rb', line 26

def connect_upstream
  case @type
  when :replset
    opts = @conn_opts.merge(:read => :secondary)
    @upstream_conn = Mongo::ReplSetConnection.new(@upstreams, opts)
  when :slave, :direct
    opts = @conn_opts.merge(:slave_ok => true)
    host, port = parse_direct_upstream
    @upstream_conn = Mongo::Connection.new(host, port, opts)
    raise "Server at #{@upstream_conn.host}:#{@upstream_conn.port} is the primary -- if you're ok with that, check why your wrapper is passing :direct rather than :slave" if @type == :slave && @upstream_conn.primary?
    ensure_upstream_replset!
  when :existing
    raise "Must pass in a single existing Mongo::Connection with :existing" unless @upstreams.length == 1 && @upstreams[0].respond_to?(:db)
    @upstream_conn = @upstreams[0]
  else
    raise "Invalid connection type: #{@type.inspect}"
  end
end

#connection_configObject



45
46
47
# File 'lib/mongoriver/tailer.rb', line 45

def connection_config
  @upstream_conn['admin'].command(:ismaster => 1)
end

#ensure_upstream_replset!Object



49
50
51
52
53
54
55
# File 'lib/mongoriver/tailer.rb', line 49

def ensure_upstream_replset!
  # Might be a better way to do this, but not seeing one.
  config = connection_config
  unless config['setName']
    raise "Server at #{@upstream_conn.host}:#{@upstream_conn.port} is not running as a replica set"
  end
end

#most_recent_timestampObject



21
22
23
24
# File 'lib/mongoriver/tailer.rb', line 21

def most_recent_timestamp
  record = oplog_collection.find_one({}, :sort => [['$natural', -1]])
  record['ts']
end

#oplog_collectionObject



70
71
72
# File 'lib/mongoriver/tailer.rb', line 70

def oplog_collection
  @upstream_conn.db('local').collection(oplog)
end

#parse_direct_upstreamObject



57
58
59
60
61
# File 'lib/mongoriver/tailer.rb', line 57

def parse_direct_upstream
  raise "When connecting directly to a mongo instance, must provide a single upstream" unless @upstreams.length == 1
  upstream = @upstreams[0]
  parse_host_spec(upstream)
end

#parse_host_spec(host_spec) ⇒ Object



63
64
65
66
67
68
# File 'lib/mongoriver/tailer.rb', line 63

def parse_host_spec(host_spec)
  host, port = host_spec.split(':')
  host = '127.0.0.1' if host.to_s.length == 0
  port = '27017' if port.to_s.length == 0
  [host, port.to_i]
end

#stopObject



113
114
115
# File 'lib/mongoriver/tailer.rb', line 113

def stop
  @stop = true
end

#stream(limit = nil) ⇒ Object



101
102
103
104
105
106
107
108
109
110
111
# File 'lib/mongoriver/tailer.rb', line 101

def stream(limit=nil)
  count = 0
  while !@stop && @cursor.has_next?
    count += 1
    break if limit && count >= limit

    yield @cursor.next
  end

  return @cursor.has_next?
end

#tail(opts = {}) ⇒ Object



74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
# File 'lib/mongoriver/tailer.rb', line 74

def tail(opts = {})
  raise "Already tailing the oplog!" if @cursor

  query = opts[:filter] || {}
  if ts = opts[:from]
    # Maybe if ts is old enough, just start from the beginning?
    query['ts'] = { '$gte' => ts }
  end

  mongo_opts = {:timeout => false}.merge(opts[:mongo_opts] || {})

  oplog_collection.find(query, mongo_opts) do |oplog|
    oplog.add_option(Mongo::Constants::OP_QUERY_TAILABLE)
    oplog.add_option(Mongo::Constants::OP_QUERY_OPLOG_REPLAY) if query['ts']
    oplog.add_option(Mongo::Constants::OP_QUERY_AWAIT_DATA) unless opts[:dont_wait]

    log.info("Starting oplog stream from #{ts || 'start'}")
    @cursor = oplog
  end
end

#tail_from(ts, opts = {}) ⇒ Object

Deprecated: use #tail(:from => ts, …) instead



96
97
98
99
# File 'lib/mongoriver/tailer.rb', line 96

def tail_from(ts, opts={})
  opts.merge(:from => ts)
  tail(opts)
end