Class: Mongoriver::Streambed

Inherits:
Object
  • Object
show all
Includes:
Logging
Defined in:
lib/mongoriver/streambed.rb

Defined Under Namespace

Classes: AssertionFailure

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Methods included from Logging

#log

Constructor Details

#initialize(upstreams, type) ⇒ Streambed

Returns a new instance of Streambed.



13
14
15
16
17
18
# File 'lib/mongoriver/streambed.rb', line 13

def initialize(upstreams, type)
  @tailer = Mongoriver::Tailer.new(upstreams, type)
  @record_fetch_batch_size = 1024
  @record_sync_batch_size = 256
  @stats = Hash.new(0)
end

Instance Attribute Details

#statsObject (readonly)

Returns the value of attribute stats.



5
6
7
# File 'lib/mongoriver/streambed.rb', line 5

def stats
  @stats
end

Class Method Details

.all_hooksObject



36
37
38
39
40
41
42
43
# File 'lib/mongoriver/streambed.rb', line 36

def self.all_hooks
  hooks = my_hooks
  if superclass <= Streambed
    hooks + superclass.all_hooks
  else
    hooks
  end
end

.hook(name, args = [], opts = {}) ⇒ Object



64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
# File 'lib/mongoriver/streambed.rb', line 64

def self.hook(name, args=[], opts={})
  if default = opts[:default]
    target = hook_name(default)
    implementation = Proc.new do |*args, &blk|
      send(target, *args, &blk)
    end
  else
    implementation = Proc.new do
      raise NotImplementedError.new("Override in subclass")
    end
  end

  define_method(hook_name(name), implementation)
  my_hooks << [name, args, opts]
end

.hook_name(name) ⇒ Object



60
61
62
# File 'lib/mongoriver/streambed.rb', line 60

def self.hook_name(name)
  "hook_#{name}"
end

.my_hooksObject



32
33
34
# File 'lib/mongoriver/streambed.rb', line 32

def self.my_hooks
  @hooks ||= []
end

.validate_hooks!Object



45
46
47
48
49
50
51
52
53
54
55
56
57
58
# File 'lib/mongoriver/streambed.rb', line 45

def self.validate_hooks!
  errors = []
  all_hooks.each do |name, args, opts|
    method = self.instance_method(hook_name(name))
    signature = "#{method.name}(#{args.join(', ')})"
    if method.owner == Streambed && !opts[:default]
      errors << "Must provide implementation of #{signature}"
    end
  end

  raise "You need to fix the following hook errors:

  #{errors.join("\n  ")}" if errors.length > 0
end

Instance Method Details

#assert(condition, msg) ⇒ Object

Raises:



9
10
11
# File 'lib/mongoriver/streambed.rb', line 9

def assert(condition, msg)
  raise AssertionFailure.new(msg) unless condition
end

#runObject



20
21
22
23
24
25
26
27
28
29
30
# File 'lib/mongoriver/streambed.rb', line 20

def run
  self.class.validate_hooks!

  unless ts = starting_optime
    ts = @tailer.most_recent_timestamp
    initial_sync
    hook_update_optime(ts, true)
  end

  tail_from(ts)
end