Class: Fluent::HashForwardOutput
- Inherits:
-
ForwardOutput
- Object
- ForwardOutput
- Fluent::HashForwardOutput
- Defined in:
- lib/fluent/plugin/out_hash_forward.rb
Instance Attribute Summary collapse
-
#hash_key_slice_lindex ⇒ Object
Returns the value of attribute hash_key_slice_lindex.
-
#hash_key_slice_rindex ⇒ Object
Returns the value of attribute hash_key_slice_rindex.
-
#regular_nodes ⇒ Object
readonly
for test.
-
#standby_nodes ⇒ Object
readonly
Returns the value of attribute standby_nodes.
Instance Method Summary collapse
- #configure(conf) ⇒ Object
-
#get_index(key, size) ⇒ Object
hashing(key) mod N.
-
#nodes(tag) ⇒ Object
Get nodes (a regular_node and a standby_node if available) using hash algorithm.
- #perform_hash_key_slice(tag) ⇒ Object
-
#rebuild_weight_array ⇒ Object
Override: I don’t use weight.
-
#str_hash(key) ⇒ Object
the simplest hashing ever gist.github.com/sonots/7263495.
-
#write_objects(tag, chunk) ⇒ Object
Override.
Instance Attribute Details
#hash_key_slice_lindex ⇒ Object
Returns the value of attribute hash_key_slice_lindex.
28 29 30 |
# File 'lib/fluent/plugin/out_hash_forward.rb', line 28 def hash_key_slice_lindex @hash_key_slice_lindex end |
#hash_key_slice_rindex ⇒ Object
Returns the value of attribute hash_key_slice_rindex.
29 30 31 |
# File 'lib/fluent/plugin/out_hash_forward.rb', line 29 def hash_key_slice_rindex @hash_key_slice_rindex end |
#regular_nodes ⇒ Object (readonly)
for test
26 27 28 |
# File 'lib/fluent/plugin/out_hash_forward.rb', line 26 def regular_nodes @regular_nodes end |
#standby_nodes ⇒ Object (readonly)
Returns the value of attribute standby_nodes.
27 28 29 |
# File 'lib/fluent/plugin/out_hash_forward.rb', line 27 def standby_nodes @standby_nodes end |
Instance Method Details
#configure(conf) ⇒ Object
8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 |
# File 'lib/fluent/plugin/out_hash_forward.rb', line 8 def configure(conf) super @standby_nodes, @regular_nodes = @nodes.partition {|n| n.standby? } if @hash_key_slice lindex, rindex = @hash_key_slice.split('..', 2) if lindex.nil? or rindex.nil? or lindex !~ /^-?\d+$/ or rindex !~ /^-?\d+$/ raise Fluent::ConfigError, "out_hash_forard: hash_key_slice must be formatted like [num]..[num]" else @hash_key_slice_lindex = lindex.to_i @hash_key_slice_rindex = rindex.to_i end end @cache_nodes = {} end |
#get_index(key, size) ⇒ Object
hashing(key) mod N
76 77 78 |
# File 'lib/fluent/plugin/out_hash_forward.rb', line 76 def get_index(key, size) str_hash(key) % size end |
#nodes(tag) ⇒ Object
Get nodes (a regular_node and a standby_node if available) using hash algorithm
64 65 66 67 68 69 70 71 72 73 |
# File 'lib/fluent/plugin/out_hash_forward.rb', line 64 def nodes(tag) if nodes = @cache_nodes[tag] return nodes end hash_key = @hash_key_slice ? perform_hash_key_slice(tag) : tag regular_index = get_index(hash_key, regular_nodes.size) standby_index = standby_nodes.size > 0 ? get_index(hash_key, standby_nodes.size) : 0 nodes = [regular_nodes[regular_index], standby_nodes[standby_index]].compact @cache_nodes[tag] = nodes end |
#perform_hash_key_slice(tag) ⇒ Object
86 87 88 89 90 |
# File 'lib/fluent/plugin/out_hash_forward.rb', line 86 def perform_hash_key_slice(tag) = tag.split('.') sliced = [@hash_key_slice_lindex..@hash_key_slice_rindex] return sliced.nil? ? "" : sliced.join('.') end |
#rebuild_weight_array ⇒ Object
Override: I don’t use weight
60 61 |
# File 'lib/fluent/plugin/out_hash_forward.rb', line 60 def rebuild_weight_array end |
#str_hash(key) ⇒ Object
the simplest hashing ever gist.github.com/sonots/7263495
82 83 84 |
# File 'lib/fluent/plugin/out_hash_forward.rb', line 82 def str_hash(key) key.bytes.inject(&:+) end |
#write_objects(tag, chunk) ⇒ Object
Override
32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 |
# File 'lib/fluent/plugin/out_hash_forward.rb', line 32 def write_objects(tag, chunk) return if chunk.empty? error = nil nodes = nodes(tag) # below is just copy from out_forward nodes.each do |node| if node.available? begin send_data(node, tag, chunk) return rescue # for load balancing during detecting crashed servers error = $! # use the latest error end end end if error raise error else raise "no nodes are available" # TODO message end end |