Class: Fluent::Plugin::ForwardOutput
- Defined in:
- lib/fluent/plugin/out_forward.rb
Defined Under Namespace
Classes: ACKWaitingSockInfo, ConnectionClosedError, Error, FailureDetector, NoNodesAvailable, Node, NoneHeartbeatNode
Constant Summary collapse
- LISTEN_PORT =
24224
- FORWARD_HEADER =
MessagePack FixArray length is 3
[0x93].pack('C').freeze
Constants inherited from Output
Output::CHUNKING_FIELD_WARN_NUM, Output::CHUNK_KEY_PATTERN, Output::CHUNK_KEY_PLACEHOLDER_PATTERN, Output::CHUNK_TAG_PLACEHOLDER_PATTERN, Output::FORMAT_COMPRESSED_MSGPACK_STREAM, Output::FORMAT_COMPRESSED_MSGPACK_STREAM_TIME_INT, Output::FORMAT_MSGPACK_STREAM, Output::FORMAT_MSGPACK_STREAM_TIME_INT, Output::TIMESTAMP_CHECK_BASE_TIME, Output::TIME_KEY_PLACEHOLDER_THRESHOLDS
Constants included from Configurable
Configurable::CONFIG_TYPE_REGISTRY
Instance Attribute Summary collapse
-
#nodes ⇒ Object
readonly
Returns the value of attribute nodes.
-
#read_interval ⇒ Object
readonly
Returns the value of attribute read_interval.
-
#recover_sample_size ⇒ Object
readonly
Returns the value of attribute recover_sample_size.
Attributes inherited from Output
#as_secondary, #buffer, #chunk_key_tag, #chunk_key_time, #chunk_keys, #delayed_commit, #delayed_commit_timeout, #dequeued_chunks, #dequeued_chunks_mutex, #emit_count, #emit_records, #num_errors, #output_enqueue_thread_waiting, #retry, #retry_for_error_chunk, #rollback_count, #secondary, #timekey_zone, #write_count
Attributes included from Fluent::PluginLoggerMixin
Attributes inherited from Base
Instance Method Summary collapse
- #close ⇒ Object
- #configure(conf) ⇒ Object
- #create_transfer_socket(host, port, hostname, &block) ⇒ Object
- #forward_header ⇒ Object
-
#initialize ⇒ ForwardOutput
constructor
A new instance of ForwardOutput.
- #multi_workers_ready? ⇒ Boolean
- #prefer_delayed_commit ⇒ Object
- #select_a_healthy_node ⇒ Object
- #start ⇒ Object
- #try_write(chunk) ⇒ Object
- #write(chunk) ⇒ Object
Methods inherited from Output
#acts_as_secondary, #after_shutdown, #after_start, #before_shutdown, #check_slow_flush, #commit_write, #emit_buffered, #emit_events, #emit_sync, #enqueue_thread_run, #enqueue_thread_wait, #execute_chunking, #extract_placeholders, #flush_thread_run, #flush_thread_wakeup, #force_flush, #format, #formatted_to_msgpack_binary, #formatted_to_msgpack_binary?, #generate_format_proc, #get_placeholders_keys, #get_placeholders_tag, #get_placeholders_time, #handle_stream_simple, #handle_stream_with_custom_format, #handle_stream_with_standard_format, #implement?, #interrupt_flushes, #metadata, #metadata_for_test, #next_flush_time, #placeholder_validate!, #placeholder_validators, #prefer_buffered_processing, #process, #retry_state, #rollback_write, #shutdown, #stop, #submit_flush_all, #submit_flush_once, #support_in_v12_style?, #terminate, #try_flush, #try_rollback_all, #try_rollback_write, #update_retry_state, #write_guard
Methods included from UniqueId::Mixin
#dump_unique_id_hex, #generate_unique_id
Methods included from Fluent::PluginHelper::Mixin
Methods included from Fluent::PluginLoggerMixin
Methods included from Fluent::PluginId
#plugin_id, #plugin_id_configured?, #plugin_id_for_test?, #plugin_root_dir
Methods inherited from Base
#after_shutdown, #after_shutdown?, #after_start, #after_started?, #before_shutdown, #before_shutdown?, #closed?, #configured?, #context_router, #context_router=, #fluentd_worker_id, #has_router?, #inspect, #plugin_root_dir, #shutdown, #shutdown?, #started?, #stop, #stopped?, #string_safe_encoding, #terminate, #terminated?
Methods included from SystemConfig::Mixin
#system_config, #system_config_override
Methods included from Configurable
#config, #configure_proxy_generate, #configured_section_create, included, lookup_type, register_type
Constructor Details
#initialize ⇒ ForwardOutput
Returns a new instance of ForwardOutput.
133 134 135 136 137 138 139 140 141 142 143 |
# File 'lib/fluent/plugin/out_forward.rb', line 133 def initialize super @nodes = [] #=> [Node] @loop = nil @thread = nil @usock = nil @sock_ack_waiting = nil @sock_ack_waiting_mutex = nil end |
Instance Attribute Details
#nodes ⇒ Object (readonly)
Returns the value of attribute nodes.
122 123 124 |
# File 'lib/fluent/plugin/out_forward.rb', line 122 def nodes @nodes end |
#read_interval ⇒ Object (readonly)
Returns the value of attribute read_interval.
131 132 133 |
# File 'lib/fluent/plugin/out_forward.rb', line 131 def read_interval @read_interval end |
#recover_sample_size ⇒ Object (readonly)
Returns the value of attribute recover_sample_size.
131 132 133 |
# File 'lib/fluent/plugin/out_forward.rb', line 131 def recover_sample_size @recover_sample_size end |
Instance Method Details
#close ⇒ Object
258 259 260 261 262 263 264 |
# File 'lib/fluent/plugin/out_forward.rb', line 258 def close if @usock # close socket and ignore errors: this socket will not be used anyway. @usock.close rescue nil end super end |
#configure(conf) ⇒ Object
145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 |
# File 'lib/fluent/plugin/out_forward.rb', line 145 def configure(conf) compat_parameters_convert(conf, :buffer, default_chunk_key: 'tag') super unless @chunk_key_tag raise Fluent::ConfigError, "buffer chunk key must include 'tag' for forward output" end @read_interval = @read_interval_msec / 1000.0 @recover_sample_size = @recover_wait / @heartbeat_interval if @heartbeat_type == :tcp log.warn "'heartbeat_type tcp' is deprecated. use 'transport' instead." @heartbeat_type = :transport end if @dns_round_robin if @heartbeat_type == :udp raise Fluent::ConfigError, "forward output heartbeat type must be 'transport' or 'none' to use dns_round_robin option" end end if @transport == :tls if @tls_cert_path && !@tls_cert_path.empty? @tls_cert_path.each do |path| raise Fluent::ConfigError, "specified cert path does not exist:#{path}" unless File.exist?(path) raise Fluent::ConfigError, "specified cert path is not readable:#{path}" unless File.readable?(path) end end if @tls_insecure_mode log.warn "TLS transport is configured in insecure way" @tls_verify_hostname = false @tls_allow_self_signed_cert = true end end @servers.each do |server| failure = FailureDetector.new(@heartbeat_interval, @hard_timeout, Time.now.to_i.to_f) name = server.name || "#{server.host}:#{server.port}" log.info "adding forwarding server '#{name}'", host: server.host, port: server.port, weight: server.weight, plugin_id: plugin_id if @heartbeat_type == :none @nodes << NoneHeartbeatNode.new(self, server, failure: failure) else node = Node.new(self, server, failure: failure) begin node.validate_host_resolution! rescue => e raise unless @ignore_network_errors_at_startup log.warn "failed to resolve node name when configured", server: (server.name || server.host), error: e node.disable! end @nodes << node end end unless @as_secondary if @compress == :gzip && @buffer.compress == :text @buffer.compress = :gzip elsif @compress == :text && @buffer.compress == :gzip log.info "buffer is compressed. If you also want to save the bandwidth of a network, Add `compress` configuration in <match>" end end if @nodes.empty? raise Fluent::ConfigError, "forward output plugin requires at least one <server> is required" end raise Fluent::ConfigError, "ack_response_timeout must be a positive integer" if @ack_response_timeout < 1 end |
#create_transfer_socket(host, port, hostname, &block) ⇒ Object
316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 |
# File 'lib/fluent/plugin/out_forward.rb', line 316 def create_transfer_socket(host, port, hostname, &block) case @transport when :tls socket_create_tls( host, port, version: @tls_version, ciphers: @tls_ciphers, insecure: @tls_insecure_mode, verify_fqdn: @tls_verify_hostname, fqdn: hostname, allow_self_signed_cert: @tls_allow_self_signed_cert, cert_paths: @tls_cert_path, linger_timeout: @send_timeout, send_timeout: @send_timeout, recv_timeout: @ack_response_timeout, &block ) when :tcp socket_create_tcp( host, port, linger_timeout: @send_timeout, send_timeout: @send_timeout, recv_timeout: @ack_response_timeout, &block ) else raise "BUG: unknown transport protocol #{@transport}" end end |
#forward_header ⇒ Object
348 349 350 |
# File 'lib/fluent/plugin/out_forward.rb', line 348 def forward_header FORWARD_HEADER end |
#multi_workers_ready? ⇒ Boolean
218 219 220 |
# File 'lib/fluent/plugin/out_forward.rb', line 218 def multi_workers_ready? true end |
#prefer_delayed_commit ⇒ Object
222 223 224 |
# File 'lib/fluent/plugin/out_forward.rb', line 222 def prefer_delayed_commit @require_ack_response end |
#select_a_healthy_node ⇒ Object
294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 |
# File 'lib/fluent/plugin/out_forward.rb', line 294 def select_a_healthy_node error = nil wlen = @weight_array.length wlen.times do @rr = (@rr + 1) % wlen node = @weight_array[@rr] next unless node.available? begin ret = yield node return ret, node rescue # for load balancing during detecting crashed servers error = $! # use the latest error end end raise error if error raise NoNodesAvailable, "no nodes are available" end |
#start ⇒ Object
226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 |
# File 'lib/fluent/plugin/out_forward.rb', line 226 def start super # Output#start sets @delayed_commit_timeout by @buffer_config.delayed_commit_timeout # But it should be overwritten by ack_response_timeout to rollback chunks after timeout if @ack_response_timeout && @delayed_commit_timeout != @ack_response_timeout log.info "delayed_commit_timeout is overwritten by ack_response_timeout" @delayed_commit_timeout = @ack_response_timeout + 2 # minimum ack_reader IO.select interval is 1s end @rand_seed = Random.new.seed rebuild_weight_array @rr = 0 unless @heartbeat_type == :none if @heartbeat_type == :udp @usock = socket_create_udp(@nodes.first.host, @nodes.first.port, nonblock: true) server_create_udp(:out_forward_heartbeat_receiver, 0, socket: @usock, max_bytes: @read_length) do |data, sock| sockaddr = Socket.pack_sockaddr_in(sock.remote_port, sock.remote_host) on_heartbeat(sockaddr, data) end end timer_execute(:out_forward_heartbeat_request, @heartbeat_interval, &method(:on_timer)) end if @require_ack_response @sock_ack_waiting_mutex = Mutex.new @sock_ack_waiting = [] thread_create(:out_forward_receiving_ack, &method(:ack_reader)) end end |
#try_write(chunk) ⇒ Object
278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 |
# File 'lib/fluent/plugin/out_forward.rb', line 278 def try_write(chunk) log.trace "writing a chunk to destination", chunk_id: dump_unique_id_hex(chunk.unique_id) if chunk.empty? commit_write(chunk.unique_id) return end tag = chunk..tag sock, node = select_a_healthy_node{|n| n.send_data(tag, chunk) } chunk_id_base64 = Base64.encode64(chunk.unique_id) current_time = Fluent::Clock.now info = ACKWaitingSockInfo.new(sock, chunk.unique_id, chunk_id_base64, node, current_time, @ack_response_timeout) @sock_ack_waiting_mutex.synchronize do @sock_ack_waiting << info end end |
#write(chunk) ⇒ Object
266 267 268 269 270 |
# File 'lib/fluent/plugin/out_forward.rb', line 266 def write(chunk) return if chunk.empty? tag = chunk..tag select_a_healthy_node{|node| node.send_data(tag, chunk) } end |