Class: Fluent::HashForwardOutput
- Inherits:
-
ForwardOutput
- Object
- ForwardOutput
- Fluent::HashForwardOutput
- Defined in:
- lib/fluent/plugin/out_hash_forward.rb
Defined Under Namespace
Classes: UndefOpenStruct
Instance Attribute Summary collapse
-
#regular_nodes ⇒ Object
readonly
Returns the value of attribute regular_nodes.
-
#standby_nodes ⇒ Object
readonly
Returns the value of attribute standby_nodes.
Instance Method Summary collapse
- #configure(conf) ⇒ Object
-
#expand_placeholder(str, tag) ⇒ Object
Replace $tag and $tags placeholders in a string.
-
#get_index(key, size) ⇒ Object
hashing(key) mod N.
-
#nodes(tag) ⇒ Object
Get nodes (a regular_node and a standby_node if available) using hash algorithm.
-
#rebuild_weight_array ⇒ Object
Override: I don’t use weight.
-
#write_objects(tag, chunk) ⇒ Object
Override.
Instance Attribute Details
#regular_nodes ⇒ Object (readonly)
Returns the value of attribute regular_nodes.
13 14 15 |
# File 'lib/fluent/plugin/out_hash_forward.rb', line 13 def regular_nodes @regular_nodes end |
#standby_nodes ⇒ Object (readonly)
Returns the value of attribute standby_nodes.
14 15 16 |
# File 'lib/fluent/plugin/out_hash_forward.rb', line 14 def standby_nodes @standby_nodes end |
Instance Method Details
#configure(conf) ⇒ Object
8 9 10 11 |
# File 'lib/fluent/plugin/out_hash_forward.rb', line 8 def configure(conf) super @standby_nodes, @regular_nodes = @nodes.partition {|n| n.standby? } end |
#expand_placeholder(str, tag) ⇒ Object
Replace $tag and $tags placeholders in a string
66 67 68 69 70 71 72 |
# File 'lib/fluent/plugin/out_hash_forward.rb', line 66 def (str, tag) struct = UndefOpenStruct.new struct.tag = tag struct. = tag.split('.') str = str.gsub(/\$\{([^}]+)\}/, '#{\1}') # ${..} => #{..} eval "\"#{str}\"", struct.instance_eval { binding } end |
#get_index(key, size) ⇒ Object
hashing(key) mod N
57 58 59 60 |
# File 'lib/fluent/plugin/out_hash_forward.rb', line 57 def get_index(key, size) require 'murmurhash3' MurmurHash3::V32.str_hash(key) % size end |
#nodes(tag) ⇒ Object
Get nodes (a regular_node and a standby_node if available) using hash algorithm
49 50 51 52 53 54 |
# File 'lib/fluent/plugin/out_hash_forward.rb', line 49 def nodes(tag) hash_key = @hash_key ? (@hash_key, tag) : tag regular_index = get_index(hash_key, regular_nodes.size) standby_index = standby_nodes.size > 0 ? get_index(hash_key, standby_nodes.size) : 0 [regular_nodes[regular_index], standby_nodes[standby_index]].compact end |
#rebuild_weight_array ⇒ Object
Override: I don’t use weight
45 46 |
# File 'lib/fluent/plugin/out_hash_forward.rb', line 45 def rebuild_weight_array end |
#write_objects(tag, chunk) ⇒ Object
Override
17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 |
# File 'lib/fluent/plugin/out_hash_forward.rb', line 17 def write_objects(tag, chunk) return if chunk.empty? error = nil nodes = nodes(tag) # below is just copy from out_forward nodes.each do |node| if node.available? begin send_data(node, tag, chunk) return rescue # for load balancing during detecting crashed servers error = $! # use the latest error end end end if error raise error else raise "no nodes are available" # TODO message end end |