Class: Mongoriver::Tailer
- Inherits:
-
Object
- Object
- Mongoriver::Tailer
- Includes:
- Logging
- Defined in:
- lib/mongoriver/tailer.rb
Direct Known Subclasses
Instance Attribute Summary collapse
-
#upstream_conn ⇒ Object
readonly
Returns the value of attribute upstream_conn.
Instance Method Summary collapse
- #connect_upstream ⇒ Object
- #ensure_upstream_replset! ⇒ Object
-
#initialize(upstreams, type) ⇒ Tailer
constructor
A new instance of Tailer.
- #most_recent_timestamp ⇒ Object
- #oplog_collection ⇒ Object
- #parse_direct_upstream ⇒ Object
- #parse_host_spec(host_spec) ⇒ Object
- #stop ⇒ Object
- #stream(limit = nil) ⇒ Object
- #tail_from(ts, opts = {}) ⇒ Object
Methods included from Logging
Constructor Details
#initialize(upstreams, type) ⇒ Tailer
Returns a new instance of Tailer.
7 8 9 10 11 12 13 14 15 16 |
# File 'lib/mongoriver/tailer.rb', line 7 def initialize(upstreams, type) @upstreams = upstreams @type = type # This number seems high @conn_opts = {:op_timeout => 86400} @cursor = nil connect_upstream end |
Instance Attribute Details
#upstream_conn ⇒ Object (readonly)
Returns the value of attribute upstream_conn.
5 6 7 |
# File 'lib/mongoriver/tailer.rb', line 5 def upstream_conn @upstream_conn end |
Instance Method Details
#connect_upstream ⇒ Object
23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 |
# File 'lib/mongoriver/tailer.rb', line 23 def connect_upstream case @type when :replset opts = @conn_opts.merge(:read => :secondary) @upstream_conn = Mongo::ReplSetConnection.new(@upstreams, opts) when :slave, :direct opts = @conn_opts.merge(:slave_ok => true) host, port = parse_direct_upstream @upstream_conn = Mongo::Connection.new(host, port, opts) raise "Server at #{@upstream_conn.host}:#{@upstream_conn.port} is the primary -- if you're ok with that, check why your wrapper is passing :direct rather than :slave (HINT: try passing a -a to scripts like optail or mongocp)" if @type == :slave && @upstream_conn.primary? ensure_upstream_replset! when :existing raise "Must pass in a single existing Mongo::Connection with :existing" unless @upstreams.length == 1 && @upstreams[0].respond_to?(:db) @upstream_conn = @upstreams[0] else raise "Invalid connection type: #{@type.inspect}" end end |
#ensure_upstream_replset! ⇒ Object
42 43 44 45 46 47 48 |
# File 'lib/mongoriver/tailer.rb', line 42 def ensure_upstream_replset! # Might be a better way to do this, but not seeing one. config = @upstream_conn['admin'].command(:ismaster => 1) unless config['setName'] raise "Server at #{@upstream_conn.host}:#{@upstream_conn.port} is not running as a replica set" end end |
#most_recent_timestamp ⇒ Object
18 19 20 21 |
# File 'lib/mongoriver/tailer.rb', line 18 def record = oplog_collection.find_one({}, :sort => [['$natural', -1]]) record['ts'] end |
#oplog_collection ⇒ Object
63 64 65 |
# File 'lib/mongoriver/tailer.rb', line 63 def oplog_collection @upstream_conn.db('local').collection('oplog.rs') end |
#parse_direct_upstream ⇒ Object
50 51 52 53 54 |
# File 'lib/mongoriver/tailer.rb', line 50 def parse_direct_upstream raise "When connecting directly to a mongo instance, must provide a single upstream" unless @upstreams.length == 1 upstream = @upstreams[0] parse_host_spec(upstream) end |
#parse_host_spec(host_spec) ⇒ Object
56 57 58 59 60 61 |
# File 'lib/mongoriver/tailer.rb', line 56 def parse_host_spec(host_spec) host, port = host_spec.split(':') host = '127.0.0.1' if host.to_s.length == 0 port = '27017' if port.to_s.length == 0 [host, port.to_i] end |
#stop ⇒ Object
84 85 86 87 |
# File 'lib/mongoriver/tailer.rb', line 84 def stop @cursor.close if @cursor @cursor = nil end |
#stream(limit = nil) ⇒ Object
89 90 91 92 93 94 95 96 97 98 |
# File 'lib/mongoriver/tailer.rb', line 89 def stream(limit=nil) count = 0 while @cursor.has_next? count += 1 break if limit && count >= limit yield @cursor.next end return @cursor.has_next? end |
#tail_from(ts, opts = {}) ⇒ Object
67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 |
# File 'lib/mongoriver/tailer.rb', line 67 def tail_from(ts, opts = {}) raise "Already tailing the oplog!" if @cursor # Maybe if ts is old enough, just start from the beginning? query = (opts[:filter] || {}).merge({ 'ts' => { '$gte' => ts } }) oplog_collection.find(query, :timeout => false) do |oplog| oplog.add_option(Mongo::Constants::OP_QUERY_TAILABLE) oplog.add_option(Mongo::Constants::OP_QUERY_OPLOG_REPLAY) oplog.add_option(Mongo::Constants::OP_QUERY_AWAIT_DATA) unless opts[:dont_wait] log.info("Starting oplog stream from #{ts}") @cursor = oplog end end |