Class: Fluent::Plugin::ForwardOutput::LoadBalancer

Inherits:
Object
  • Object
show all
Defined in:
lib/fluent/plugin/out_forward/load_balancer.rb

Instance Method Summary collapse

Constructor Details

#initialize(log) ⇒ LoadBalancer

Returns a new instance of LoadBalancer.



23
24
25
26
27
28
29
# File 'lib/fluent/plugin/out_forward/load_balancer.rb', line 23

def initialize(log)
  @log = log
  @weight_array = []
  @rand_seed = Random.new.seed
  @rr = 0
  @mutex = Mutex.new
end

Instance Method Details

#rebuild_weight_array(nodes) ⇒ Object Also known as: rebalance



58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
# File 'lib/fluent/plugin/out_forward/load_balancer.rb', line 58

def rebuild_weight_array(nodes)
  standby_nodes, regular_nodes = nodes.select { |e| e.weight > 0 }.partition {|n|
    n.standby?
  }

  lost_weight = 0
  regular_nodes.each {|n|
    unless n.available?
      lost_weight += n.weight
    end
  }
  @log.debug("rebuilding weight array", lost_weight: lost_weight)

  if lost_weight > 0
    standby_nodes.each {|n|
      if n.available?
        regular_nodes << n
        @log.warn "using standby node #{n.host}:#{n.port}", weight: n.weight
        lost_weight -= n.weight
        break if lost_weight <= 0
      end
    }
  end

  weight_array = []
  if regular_nodes.empty?
    @log.warn('No nodes are available')
    @mutex.synchronize do
      @weight_array = weight_array
    end
    return @weight_array
  end

  gcd = regular_nodes.map {|n| n.weight }.inject(0) {|r,w| r.gcd(w) }
  regular_nodes.each {|n|
    (n.weight / gcd).times {
      weight_array << n
    }
  }

  # for load balancing during detecting crashed servers
  coe = (regular_nodes.size * 6) / weight_array.size
  weight_array *= coe if coe > 1

  r = Random.new(@rand_seed)
  weight_array.sort_by! { r.rand }

  @mutex.synchronize do
    @weight_array = weight_array
  end
end

#select_healthy_nodeObject Also known as: select_service

Raises:



31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
# File 'lib/fluent/plugin/out_forward/load_balancer.rb', line 31

def select_healthy_node
  error = nil

  # Don't care about the change of @weight_array's size while looping since
  # it's only used for determining the number of loops and it is not so important.
  wlen = @weight_array.size
  wlen.times do
    node = @mutex.synchronize do
      r = @rr % @weight_array.size
      @rr = (r + 1) % @weight_array.size
      @weight_array[r]
    end
    next unless node.available?

    begin
      ret = yield node
      return ret, node
    rescue
      # for load balancing during detecting crashed servers
      error = $!  # use the latest error
    end
  end

  raise error if error
  raise NoNodesAvailable, "no nodes are available"
end