Class: Mongoriver::Tailer

Inherits:
Object
  • Object
show all
Includes:
Assertions, Logging
Defined in:
lib/mongoriver/tailer.rb

Direct Known Subclasses

AbstractPersistentTailer

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from Assertions

#assert

Methods included from Logging

#log

Constructor Details

#initialize(upstreams, type, oplog = "oplog.rs") ⇒ Tailer

Returns a new instance of Tailer.



10
11
12
13
14
15
16
17
18
19
20
21
22
23
# File 'lib/mongoriver/tailer.rb', line 10

def initialize(upstreams, type, oplog = "oplog.rs")
  @upstreams = upstreams
  @type = type
  @oplog = oplog
  # This number seems high
  @conn_opts = {:op_timeout => 86400}

  @cursor = nil
  @stop = false
  @streaming = false

  connect_upstream
  @database_type = Mongoriver::Toku.conversion_needed?(@upstream_conn) ? :toku : :mongo
end

Instance Attribute Details

#database_typeObject (readonly)

Returns the value of attribute database_type.



8
9
10
# File 'lib/mongoriver/tailer.rb', line 8

def database_type
  @database_type
end

#oplogObject (readonly)

Returns the value of attribute oplog.



7
8
9
# File 'lib/mongoriver/tailer.rb', line 7

def oplog
  @oplog
end

#upstream_connObject (readonly)

Returns the value of attribute upstream_conn.



6
7
8
# File 'lib/mongoriver/tailer.rb', line 6

def upstream_conn
  @upstream_conn
end

Instance Method Details

#closeObject



189
190
191
192
193
# File 'lib/mongoriver/tailer.rb', line 189

def close
  @cursor.close if @cursor
  @cursor = nil
  @stop = false
end

#connect_upstreamObject



80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
# File 'lib/mongoriver/tailer.rb', line 80

def connect_upstream
  case @type
  when :replset
    opts = @conn_opts.merge(:read => :secondary)
    @upstream_conn = Mongo::ReplSetConnection.new(@upstreams, opts)
  when :slave, :direct
    opts = @conn_opts.merge(:slave_ok => true)
    host, port = parse_direct_upstream
    @upstream_conn = Mongo::Connection.new(host, port, opts)
    raise "Server at #{@upstream_conn.host}:#{@upstream_conn.port} is the primary -- if you're ok with that, check why your wrapper is passing :direct rather than :slave" if @type == :slave && @upstream_conn.primary?
    ensure_upstream_replset!
  when :existing
    raise "Must pass in a single existing Mongo::Connection with :existing" unless @upstreams.length == 1 && @upstreams[0].respond_to?(:db)
    @upstream_conn = @upstreams[0]
  else
    raise "Invalid connection type: #{@type.inspect}"
  end
end

#connection_configObject



99
100
101
# File 'lib/mongoriver/tailer.rb', line 99

def connection_config
  @upstream_conn['admin'].command(:ismaster => 1)
end

#ensure_upstream_replset!Object



103
104
105
106
107
108
109
# File 'lib/mongoriver/tailer.rb', line 103

def ensure_upstream_replset!
  # Might be a better way to do this, but not seeing one.
  config = connection_config
  unless config['setName']
    raise "Server at #{@upstream_conn.host}:#{@upstream_conn.port} is not running as a replica set"
  end
end

#latest_oplog_entry(before_time = nil) ⇒ Object



59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
# File 'lib/mongoriver/tailer.rb', line 59

def latest_oplog_entry(before_time=nil)
  query = {}
  if before_time
    case database_type
    when :mongo
      ts = BSON::Timestamp.new(before_time.to_i + 1, 0)
    when :toku
      ts = before_time + 1
    end
    query = { 'ts' => { '$lt' => ts } }
  end

  case database_type
  when :mongo
    record = oplog_collection.find_one(query, :sort => [['$natural', -1]])
  when :toku
    record = oplog_collection.find_one(query, :sort => [['_id', -1]])
  end
  record
end

#most_recent_position(before_time = nil) ⇒ Object

Find the most recent entry in oplog and return a position for that position. The position can be passed to the tail function (or run_forever) and the tailer will start tailing after that. If before_time is given, it will return the latest position before (or at) time.



55
56
57
# File 'lib/mongoriver/tailer.rb', line 55

def most_recent_position(before_time=nil)
  position(latest_oplog_entry(before_time))
end

#oplog_collectionObject



124
125
126
# File 'lib/mongoriver/tailer.rb', line 124

def oplog_collection
  @upstream_conn.db('local').collection(oplog)
end

#parse_direct_upstreamObject



111
112
113
114
115
# File 'lib/mongoriver/tailer.rb', line 111

def parse_direct_upstream
  raise "When connecting directly to a mongo instance, must provide a single upstream" unless @upstreams.length == 1
  upstream = @upstreams[0]
  parse_host_spec(upstream)
end

#parse_host_spec(host_spec) ⇒ Object



117
118
119
120
121
122
# File 'lib/mongoriver/tailer.rb', line 117

def parse_host_spec(host_spec)
  host, port = host_spec.split(':')
  host = '127.0.0.1' if host.to_s.length == 0
  port = '27017' if port.to_s.length == 0
  [host, port.to_i]
end

#position(record) ⇒ BSON::Timestamp, BSON::Binary

Return a position for a record object

Returns:

  • (BSON::Timestamp)

    if mongo

  • (BSON::Binary)

    if tokumx



29
30
31
32
33
34
35
36
37
# File 'lib/mongoriver/tailer.rb', line 29

def position(record)
  return nil unless record
  case database_type
  when :mongo
    return record['ts']
  when :toku
    return record['_id']
  end
end

#stopObject



185
186
187
# File 'lib/mongoriver/tailer.rb', line 185

def stop
  @stop = true
end

#stream(limit = nil, &blk) ⇒ Object



163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
# File 'lib/mongoriver/tailer.rb', line 163

def stream(limit=nil, &blk)
  count = 0
  @streaming = true
  while !@stop && @cursor.has_next?
    count += 1
    break if limit && count >= limit

    record = @cursor.next

    case database_type
    when :mongo
      blk.call(record)
    when :toku
      converted = Toku.convert(record, @upstream_conn)
      converted.each(&blk)
    end
  end
  @streaming = false

  return @cursor.has_next?
end

#tail(opts = {}) ⇒ Object

Start tailing the oplog.

Parameters:

  • (Hash)
  • opts (Hash) (defaults to: {})

    a customizable set of options

Options Hash (opts):

  • :from (BSON::Timestamp, BSON::Binary)

    Placeholder indicating where to start the query from. Binary value is used for tokumx. The timestamp is non-inclusive.

  • :filter (Hash)

    Extra filters for the query.

  • :dont_wait(false) (Bool)


136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
# File 'lib/mongoriver/tailer.rb', line 136

def tail(opts = {})
  raise "Already tailing the oplog!" if @cursor

  query = build_tail_query(opts)

  mongo_opts = {:timeout => false}.merge(opts[:mongo_opts] || {})

  oplog_collection.find(query, mongo_opts) do |oplog|
    oplog.add_option(Mongo::Constants::OP_QUERY_TAILABLE)
    oplog.add_option(Mongo::Constants::OP_QUERY_OPLOG_REPLAY) if query['ts']
    oplog.add_option(Mongo::Constants::OP_QUERY_AWAIT_DATA) unless opts[:dont_wait]

    log.debug("Starting oplog stream from #{opts[:from] || 'start'}")
    @cursor = oplog
  end
end

#tail_from(ts, opts = {}) ⇒ Object

Deprecated: use #tail(:from => ts, …) instead



154
155
156
157
# File 'lib/mongoriver/tailer.rb', line 154

def tail_from(ts, opts={})
  opts.merge(:from => ts)
  tail(opts)
end

#tailingObject



159
160
161
# File 'lib/mongoriver/tailer.rb', line 159

def tailing
  !@stop || @streaming
end

#time_for(record) ⇒ Object

Return a time for a record object

Returns:

  • Time



41
42
43
44
45
46
47
48
49
# File 'lib/mongoriver/tailer.rb', line 41

def time_for(record)
  return nil unless record
  case database_type
  when :mongo
    return Time.at(record['ts'].seconds)
  when :toku
    return record['ts']
  end
end