Class: Fluent::HashForwardOutput

Inherits:
ForwardOutput
  • Object
show all
Defined in:
lib/fluent/plugin/out_hash_forward.rb

Defined Under Namespace

Classes: UndefOpenStruct

Instance Attribute Summary collapse

Instance Method Summary collapse

Instance Attribute Details

#regular_nodesObject (readonly)

Returns the value of attribute regular_nodes.



13
14
15
# File 'lib/fluent/plugin/out_hash_forward.rb', line 13

def regular_nodes
  @regular_nodes
end

#standby_nodesObject (readonly)

Returns the value of attribute standby_nodes.



14
15
16
# File 'lib/fluent/plugin/out_hash_forward.rb', line 14

def standby_nodes
  @standby_nodes
end

Instance Method Details

#configure(conf) ⇒ Object



8
9
10
11
# File 'lib/fluent/plugin/out_hash_forward.rb', line 8

def configure(conf)
  super
  @standby_nodes, @regular_nodes = @nodes.partition {|n| n.standby? }
end

#expand_placeholder(str, tag) ⇒ Object

Replace $tag and $tags placeholders in a string

Parameters:

  • str (String)

    the string to be expanded

  • tag (String)

    tag of a message



66
67
68
69
70
71
72
# File 'lib/fluent/plugin/out_hash_forward.rb', line 66

def expand_placeholder(str, tag)
  struct = UndefOpenStruct.new
  struct.tag  = tag
  struct.tags = tag.split('.')
  str = str.gsub(/\$\{([^}]+)\}/, '#{\1}') # ${..} => #{..}
  eval "\"#{str}\"", struct.instance_eval { binding }
end

#get_index(key, size) ⇒ Object

hashing(key) mod N



57
58
59
60
# File 'lib/fluent/plugin/out_hash_forward.rb', line 57

def get_index(key, size)
  require 'murmurhash3'
  MurmurHash3::V32.str_hash(key) % size
end

#nodes(tag) ⇒ Object

Get nodes (a regular_node and a standby_node if available) using hash algorithm



49
50
51
52
53
54
# File 'lib/fluent/plugin/out_hash_forward.rb', line 49

def nodes(tag)
  hash_key = @hash_key ? expand_placeholder(@hash_key, tag) : tag
  regular_index = get_index(hash_key, regular_nodes.size)
  standby_index = standby_nodes.size > 0 ? get_index(hash_key, standby_nodes.size) : 0
  [regular_nodes[regular_index], standby_nodes[standby_index]].compact
end

#rebuild_weight_arrayObject

Override: I don’t use weight



45
46
# File 'lib/fluent/plugin/out_hash_forward.rb', line 45

def rebuild_weight_array
end

#write_objects(tag, chunk) ⇒ Object

Override



17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
# File 'lib/fluent/plugin/out_hash_forward.rb', line 17

def write_objects(tag, chunk)
  return if chunk.empty?

  error = nil

  nodes = nodes(tag)

  # below is just copy from out_forward
  nodes.each do |node|
    if node.available?
      begin
        send_data(node, tag, chunk)
        return
      rescue
        # for load balancing during detecting crashed servers
        error = $!  # use the latest error
      end
    end
  end

  if error
    raise error
  else
    raise "no nodes are available"  # TODO message
  end
end