Class: Mongoriver::Tailer
- Inherits:
-
Object
- Object
- Mongoriver::Tailer
- Includes:
- Logging
- Defined in:
- lib/mongoriver/tailer.rb
Direct Known Subclasses
Instance Attribute Summary collapse
-
#oplog ⇒ Object
readonly
Returns the value of attribute oplog.
-
#upstream_conn ⇒ Object
readonly
Returns the value of attribute upstream_conn.
Instance Method Summary collapse
- #close ⇒ Object
- #connect_upstream ⇒ Object
- #connection_config ⇒ Object
- #ensure_upstream_replset! ⇒ Object
-
#initialize(upstreams, type, oplog = "oplog.rs") ⇒ Tailer
constructor
A new instance of Tailer.
- #most_recent_timestamp ⇒ Object
- #oplog_collection ⇒ Object
- #parse_direct_upstream ⇒ Object
- #parse_host_spec(host_spec) ⇒ Object
- #stop ⇒ Object
- #stream(limit = nil) ⇒ Object
- #tail(opts = {}) ⇒ Object
-
#tail_from(ts, opts = {}) ⇒ Object
Deprecated: use #tail(:from => ts, …) instead.
Methods included from Logging
Constructor Details
#initialize(upstreams, type, oplog = "oplog.rs") ⇒ Tailer
Returns a new instance of Tailer.
8 9 10 11 12 13 14 15 16 17 18 19 |
# File 'lib/mongoriver/tailer.rb', line 8 def initialize(upstreams, type, oplog = "oplog.rs") @upstreams = upstreams @type = type @oplog = oplog # This number seems high @conn_opts = {:op_timeout => 86400} @cursor = nil @stop = false connect_upstream end |
Instance Attribute Details
#oplog ⇒ Object (readonly)
Returns the value of attribute oplog.
6 7 8 |
# File 'lib/mongoriver/tailer.rb', line 6 def oplog @oplog end |
#upstream_conn ⇒ Object (readonly)
Returns the value of attribute upstream_conn.
5 6 7 |
# File 'lib/mongoriver/tailer.rb', line 5 def upstream_conn @upstream_conn end |
Instance Method Details
#close ⇒ Object
117 118 119 120 121 |
# File 'lib/mongoriver/tailer.rb', line 117 def close @cursor.close if @cursor @cursor = nil @stop = false end |
#connect_upstream ⇒ Object
26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 |
# File 'lib/mongoriver/tailer.rb', line 26 def connect_upstream case @type when :replset opts = @conn_opts.merge(:read => :secondary) @upstream_conn = Mongo::ReplSetConnection.new(@upstreams, opts) when :slave, :direct opts = @conn_opts.merge(:slave_ok => true) host, port = parse_direct_upstream @upstream_conn = Mongo::Connection.new(host, port, opts) raise "Server at #{@upstream_conn.host}:#{@upstream_conn.port} is the primary -- if you're ok with that, check why your wrapper is passing :direct rather than :slave" if @type == :slave && @upstream_conn.primary? ensure_upstream_replset! when :existing raise "Must pass in a single existing Mongo::Connection with :existing" unless @upstreams.length == 1 && @upstreams[0].respond_to?(:db) @upstream_conn = @upstreams[0] else raise "Invalid connection type: #{@type.inspect}" end end |
#connection_config ⇒ Object
45 46 47 |
# File 'lib/mongoriver/tailer.rb', line 45 def connection_config @upstream_conn['admin'].command(:ismaster => 1) end |
#ensure_upstream_replset! ⇒ Object
49 50 51 52 53 54 55 |
# File 'lib/mongoriver/tailer.rb', line 49 def ensure_upstream_replset! # Might be a better way to do this, but not seeing one. config = connection_config unless config['setName'] raise "Server at #{@upstream_conn.host}:#{@upstream_conn.port} is not running as a replica set" end end |
#most_recent_timestamp ⇒ Object
21 22 23 24 |
# File 'lib/mongoriver/tailer.rb', line 21 def record = oplog_collection.find_one({}, :sort => [['$natural', -1]]) record['ts'] end |
#oplog_collection ⇒ Object
70 71 72 |
# File 'lib/mongoriver/tailer.rb', line 70 def oplog_collection @upstream_conn.db('local').collection(oplog) end |
#parse_direct_upstream ⇒ Object
57 58 59 60 61 |
# File 'lib/mongoriver/tailer.rb', line 57 def parse_direct_upstream raise "When connecting directly to a mongo instance, must provide a single upstream" unless @upstreams.length == 1 upstream = @upstreams[0] parse_host_spec(upstream) end |
#parse_host_spec(host_spec) ⇒ Object
63 64 65 66 67 68 |
# File 'lib/mongoriver/tailer.rb', line 63 def parse_host_spec(host_spec) host, port = host_spec.split(':') host = '127.0.0.1' if host.to_s.length == 0 port = '27017' if port.to_s.length == 0 [host, port.to_i] end |
#stop ⇒ Object
113 114 115 |
# File 'lib/mongoriver/tailer.rb', line 113 def stop @stop = true end |
#stream(limit = nil) ⇒ Object
101 102 103 104 105 106 107 108 109 110 111 |
# File 'lib/mongoriver/tailer.rb', line 101 def stream(limit=nil) count = 0 while !@stop && @cursor.has_next? count += 1 break if limit && count >= limit yield @cursor.next end return @cursor.has_next? end |
#tail(opts = {}) ⇒ Object
74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 |
# File 'lib/mongoriver/tailer.rb', line 74 def tail(opts = {}) raise "Already tailing the oplog!" if @cursor query = opts[:filter] || {} if ts = opts[:from] # Maybe if ts is old enough, just start from the beginning? query['ts'] = { '$gte' => ts } end mongo_opts = {:timeout => false}.merge(opts[:mongo_opts] || {}) oplog_collection.find(query, mongo_opts) do |oplog| oplog.add_option(Mongo::Constants::OP_QUERY_TAILABLE) oplog.add_option(Mongo::Constants::OP_QUERY_OPLOG_REPLAY) if query['ts'] oplog.add_option(Mongo::Constants::OP_QUERY_AWAIT_DATA) unless opts[:dont_wait] log.info("Starting oplog stream from #{ts || 'start'}") @cursor = oplog end end |
#tail_from(ts, opts = {}) ⇒ Object
Deprecated: use #tail(:from => ts, …) instead
96 97 98 99 |
# File 'lib/mongoriver/tailer.rb', line 96 def tail_from(ts, opts={}) opts.merge(:from => ts) tail(opts) end |