Class: Fluent::HashForwardOutput

Inherits:
ForwardOutput
  • Object
show all
Defined in:
lib/fluent/plugin/out_hash_forward.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Instance Attribute Details

#hash_key_slice_lindexObject

Returns the value of attribute hash_key_slice_lindex.



28
29
30
# File 'lib/fluent/plugin/out_hash_forward.rb', line 28

def hash_key_slice_lindex
  @hash_key_slice_lindex
end

#hash_key_slice_rindexObject

Returns the value of attribute hash_key_slice_rindex.



29
30
31
# File 'lib/fluent/plugin/out_hash_forward.rb', line 29

def hash_key_slice_rindex
  @hash_key_slice_rindex
end

#regular_nodesObject (readonly)

for test



26
27
28
# File 'lib/fluent/plugin/out_hash_forward.rb', line 26

def regular_nodes
  @regular_nodes
end

#standby_nodesObject (readonly)

Returns the value of attribute standby_nodes.



27
28
29
# File 'lib/fluent/plugin/out_hash_forward.rb', line 27

def standby_nodes
  @standby_nodes
end

Instance Method Details

#configure(conf) ⇒ Object



8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
# File 'lib/fluent/plugin/out_hash_forward.rb', line 8

def configure(conf)
  super
  @standby_nodes, @regular_nodes = @nodes.partition {|n| n.standby? }

  if @hash_key_slice
    lindex, rindex = @hash_key_slice.split('..', 2)
    if lindex.nil? or rindex.nil? or lindex !~ /^-?\d+$/ or rindex !~ /^-?\d+$/
      raise Fluent::ConfigError, "out_hash_forard: hash_key_slice must be formatted like [num]..[num]"
    else
      @hash_key_slice_lindex = lindex.to_i
      @hash_key_slice_rindex = rindex.to_i
    end
  end

  @cache_nodes = {}
end

#get_index(key, size) ⇒ Object

hashing(key) mod N



76
77
78
# File 'lib/fluent/plugin/out_hash_forward.rb', line 76

def get_index(key, size)
  str_hash(key) % size
end

#nodes(tag) ⇒ Object

Get nodes (a regular_node and a standby_node if available) using hash algorithm



64
65
66
67
68
69
70
71
72
73
# File 'lib/fluent/plugin/out_hash_forward.rb', line 64

def nodes(tag)
  if nodes = @cache_nodes[tag]
    return nodes
  end
  hash_key = @hash_key_slice ? perform_hash_key_slice(tag) : tag
  regular_index = get_index(hash_key, regular_nodes.size)
  standby_index = standby_nodes.size > 0 ? get_index(hash_key, standby_nodes.size) : 0
  nodes = [regular_nodes[regular_index], standby_nodes[standby_index]].compact
  @cache_nodes[tag] = nodes
end

#perform_hash_key_slice(tag) ⇒ Object



86
87
88
89
90
# File 'lib/fluent/plugin/out_hash_forward.rb', line 86

def perform_hash_key_slice(tag)
  tags = tag.split('.')
  sliced = tags[@hash_key_slice_lindex..@hash_key_slice_rindex]
  return sliced.nil? ? "" : sliced.join('.')
end

#rebuild_weight_arrayObject

Override: I don’t use weight



60
61
# File 'lib/fluent/plugin/out_hash_forward.rb', line 60

def rebuild_weight_array
end

#str_hash(key) ⇒ Object

the simplest hashing ever gist.github.com/sonots/7263495



82
83
84
# File 'lib/fluent/plugin/out_hash_forward.rb', line 82

def str_hash(key)
  key.bytes.inject(&:+)
end

#write_objects(tag, chunk) ⇒ Object

Override



32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
# File 'lib/fluent/plugin/out_hash_forward.rb', line 32

def write_objects(tag, chunk)
  return if chunk.empty?

  error = nil

  nodes = nodes(tag)

  # below is just copy from out_forward
  nodes.each do |node|
    if node.available?
      begin
        send_data(node, tag, chunk)
        return
      rescue
        # for load balancing during detecting crashed servers
        error = $!  # use the latest error
      end
    end
  end

  if error
    raise error
  else
    raise "no nodes are available"  # TODO message
  end
end