Class: Mongoriver::Stream

Inherits:
Object
  • Object
show all
Includes:
Assertions, Logging
Defined in:
lib/mongoriver/stream.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from Assertions

#assert

Methods included from Logging

#log

Constructor Details

#initialize(tailer, outlet) ⇒ Stream

Returns a new instance of Stream.



8
9
10
11
12
13
14
15
16
17
18
# File 'lib/mongoriver/stream.rb', line 8

def initialize(tailer, outlet)
  assert(tailer.is_a?(Tailer),
         "tailer must be a subclass/instance of Tailer")
  assert(outlet.is_a?(AbstractOutlet),
         "outlet must be a subclass (or instance) of AbstractOutlet")

  @tailer = tailer
  @outlet = outlet
  @stop = false
  @stats = {}
end

Instance Attribute Details

#outletObject

Returns the value of attribute outlet.



6
7
8
# File 'lib/mongoriver/stream.rb', line 6

def outlet
  @outlet
end

#tailerObject

Returns the value of attribute tailer.



6
7
8
# File 'lib/mongoriver/stream.rb', line 6

def tailer
  @tailer
end

Instance Method Details

#run_forever(position = nil) ⇒ Object

Parameters:

  • position (BSON::Timestamp, BSON::Binary, Time) (defaults to: nil)

    position to start following the oplog from. @see Tailer#most_recent_position



26
27
28
29
30
31
32
33
34
35
36
37
38
# File 'lib/mongoriver/stream.rb', line 26

def run_forever(position=nil)
  if position.is_a?(Time)
    position = @tailer.most_recent_position(position)
  end
  log.debug("Start position: #{position.inspect}")
  @tailer.tail(:from => position)

  until @stop
    @tailer.stream do |op|
      handle_op(op)
    end
  end
end

#statsObject



20
21
22
# File 'lib/mongoriver/stream.rb', line 20

def stats
  @stats
end

#stopObject



40
41
42
43
# File 'lib/mongoriver/stream.rb', line 40

def stop
  @stop = true
  @tailer.stop
end