Class: Mongoriver::Tailer
- Inherits:
-
Object
- Object
- Mongoriver::Tailer
- Includes:
- Assertions, Logging
- Defined in:
- lib/mongoriver/tailer.rb
Direct Known Subclasses
Instance Attribute Summary collapse
-
#database_type ⇒ Object
readonly
Returns the value of attribute database_type.
-
#oplog ⇒ Object
readonly
Returns the value of attribute oplog.
-
#upstream_conn ⇒ Object
readonly
Returns the value of attribute upstream_conn.
Instance Method Summary collapse
- #close ⇒ Object
- #connect_upstream ⇒ Object
- #connection_config ⇒ Object
- #ensure_upstream_replset! ⇒ Object
-
#initialize(upstreams, type, oplog = "oplog.rs") ⇒ Tailer
constructor
A new instance of Tailer.
- #latest_oplog_entry(before_time = nil) ⇒ Object
-
#most_recent_position(before_time = nil) ⇒ Object
Find the most recent entry in oplog and return a position for that position.
- #oplog_collection ⇒ Object
- #parse_direct_upstream ⇒ Object
- #parse_host_spec(host_spec) ⇒ Object
-
#position(record) ⇒ BSON::Timestamp, BSON::Binary
Return a position for a record object.
- #stop ⇒ Object
- #stream(limit = nil, &blk) ⇒ Object
-
#tail(opts = {}) ⇒ Object
Start tailing the oplog.
-
#tail_from(ts, opts = {}) ⇒ Object
Deprecated: use #tail(:from => ts, …) instead.
- #tailing ⇒ Object
-
#time_for(record) ⇒ Object
Return a time for a record object.
Methods included from Assertions
Methods included from Logging
Constructor Details
#initialize(upstreams, type, oplog = "oplog.rs") ⇒ Tailer
Returns a new instance of Tailer.
10 11 12 13 14 15 16 17 18 19 20 21 22 23 |
# File 'lib/mongoriver/tailer.rb', line 10 def initialize(upstreams, type, oplog = "oplog.rs") @upstreams = upstreams @type = type @oplog = oplog # This number seems high @conn_opts = {:op_timeout => 86400} @cursor = nil @stop = false @streaming = false connect_upstream @database_type = Mongoriver::Toku.conversion_needed?(@upstream_conn) ? :toku : :mongo end |
Instance Attribute Details
#database_type ⇒ Object (readonly)
Returns the value of attribute database_type.
8 9 10 |
# File 'lib/mongoriver/tailer.rb', line 8 def database_type @database_type end |
#oplog ⇒ Object (readonly)
Returns the value of attribute oplog.
7 8 9 |
# File 'lib/mongoriver/tailer.rb', line 7 def oplog @oplog end |
#upstream_conn ⇒ Object (readonly)
Returns the value of attribute upstream_conn.
6 7 8 |
# File 'lib/mongoriver/tailer.rb', line 6 def upstream_conn @upstream_conn end |
Instance Method Details
#close ⇒ Object
189 190 191 192 193 |
# File 'lib/mongoriver/tailer.rb', line 189 def close @cursor.close if @cursor @cursor = nil @stop = false end |
#connect_upstream ⇒ Object
80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 |
# File 'lib/mongoriver/tailer.rb', line 80 def connect_upstream case @type when :replset opts = @conn_opts.merge(:read => :secondary) @upstream_conn = Mongo::ReplSetConnection.new(@upstreams, opts) when :slave, :direct opts = @conn_opts.merge(:slave_ok => true) host, port = parse_direct_upstream @upstream_conn = Mongo::Connection.new(host, port, opts) raise "Server at #{@upstream_conn.host}:#{@upstream_conn.port} is the primary -- if you're ok with that, check why your wrapper is passing :direct rather than :slave" if @type == :slave && @upstream_conn.primary? ensure_upstream_replset! when :existing raise "Must pass in a single existing Mongo::Connection with :existing" unless @upstreams.length == 1 && @upstreams[0].respond_to?(:db) @upstream_conn = @upstreams[0] else raise "Invalid connection type: #{@type.inspect}" end end |
#connection_config ⇒ Object
99 100 101 |
# File 'lib/mongoriver/tailer.rb', line 99 def connection_config @upstream_conn['admin'].command(:ismaster => 1) end |
#ensure_upstream_replset! ⇒ Object
103 104 105 106 107 108 109 |
# File 'lib/mongoriver/tailer.rb', line 103 def ensure_upstream_replset! # Might be a better way to do this, but not seeing one. config = connection_config unless config['setName'] raise "Server at #{@upstream_conn.host}:#{@upstream_conn.port} is not running as a replica set" end end |
#latest_oplog_entry(before_time = nil) ⇒ Object
59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 |
# File 'lib/mongoriver/tailer.rb', line 59 def latest_oplog_entry(before_time=nil) query = {} if before_time case database_type when :mongo ts = BSON::Timestamp.new(before_time.to_i + 1, 0) when :toku ts = before_time + 1 end query = { 'ts' => { '$lt' => ts } } end case database_type when :mongo record = oplog_collection.find_one(query, :sort => [['$natural', -1]]) when :toku record = oplog_collection.find_one(query, :sort => [['_id', -1]]) end record end |
#most_recent_position(before_time = nil) ⇒ Object
Find the most recent entry in oplog and return a position for that position. The position can be passed to the tail function (or run_forever) and the tailer will start tailing after that. If before_time is given, it will return the latest position before (or at) time.
55 56 57 |
# File 'lib/mongoriver/tailer.rb', line 55 def most_recent_position(before_time=nil) position(latest_oplog_entry(before_time)) end |
#oplog_collection ⇒ Object
124 125 126 |
# File 'lib/mongoriver/tailer.rb', line 124 def oplog_collection @upstream_conn.db('local').collection(oplog) end |
#parse_direct_upstream ⇒ Object
111 112 113 114 115 |
# File 'lib/mongoriver/tailer.rb', line 111 def parse_direct_upstream raise "When connecting directly to a mongo instance, must provide a single upstream" unless @upstreams.length == 1 upstream = @upstreams[0] parse_host_spec(upstream) end |
#parse_host_spec(host_spec) ⇒ Object
117 118 119 120 121 122 |
# File 'lib/mongoriver/tailer.rb', line 117 def parse_host_spec(host_spec) host, port = host_spec.split(':') host = '127.0.0.1' if host.to_s.length == 0 port = '27017' if port.to_s.length == 0 [host, port.to_i] end |
#position(record) ⇒ BSON::Timestamp, BSON::Binary
Return a position for a record object
29 30 31 32 33 34 35 36 37 |
# File 'lib/mongoriver/tailer.rb', line 29 def position(record) return nil unless record case database_type when :mongo return record['ts'] when :toku return record['_id'] end end |
#stop ⇒ Object
185 186 187 |
# File 'lib/mongoriver/tailer.rb', line 185 def stop @stop = true end |
#stream(limit = nil, &blk) ⇒ Object
163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 |
# File 'lib/mongoriver/tailer.rb', line 163 def stream(limit=nil, &blk) count = 0 @streaming = true while !@stop && @cursor.has_next? count += 1 break if limit && count >= limit record = @cursor.next case database_type when :mongo blk.call(record) when :toku converted = Toku.convert(record, @upstream_conn) converted.each(&blk) end end @streaming = false return @cursor.has_next? end |
#tail(opts = {}) ⇒ Object
Start tailing the oplog.
136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 |
# File 'lib/mongoriver/tailer.rb', line 136 def tail(opts = {}) raise "Already tailing the oplog!" if @cursor query = build_tail_query(opts) mongo_opts = {:timeout => false}.merge(opts[:mongo_opts] || {}) oplog_collection.find(query, mongo_opts) do |oplog| oplog.add_option(Mongo::Constants::OP_QUERY_TAILABLE) oplog.add_option(Mongo::Constants::OP_QUERY_OPLOG_REPLAY) if query['ts'] oplog.add_option(Mongo::Constants::OP_QUERY_AWAIT_DATA) unless opts[:dont_wait] log.debug("Starting oplog stream from #{opts[:from] || 'start'}") @cursor = oplog end end |
#tail_from(ts, opts = {}) ⇒ Object
Deprecated: use #tail(:from => ts, …) instead
154 155 156 157 |
# File 'lib/mongoriver/tailer.rb', line 154 def tail_from(ts, opts={}) opts.merge(:from => ts) tail(opts) end |
#tailing ⇒ Object
159 160 161 |
# File 'lib/mongoriver/tailer.rb', line 159 def tailing !@stop || @streaming end |
#time_for(record) ⇒ Object
Return a time for a record object
41 42 43 44 45 46 47 48 49 |
# File 'lib/mongoriver/tailer.rb', line 41 def time_for(record) return nil unless record case database_type when :mongo return Time.at(record['ts'].seconds) when :toku return record['ts'] end end |