Class: Fluent::Plugin::TcpInput

Inherits:
Input show all
Defined in:
lib/fluent/plugin/in_tcp.rb

Constant Summary

Constants included from Configurable

Configurable::CONFIG_TYPE_REGISTRY

Instance Attribute Summary

Attributes included from Fluent::PluginLoggerMixin

#log

Attributes inherited from Base

#under_plugin_development

Instance Method Summary collapse

Methods inherited from Input

#emit_records, #emit_size, #initialize, #metric_callback, #statistics

Methods included from Fluent::PluginHelper::Mixin

included

Methods included from Fluent::PluginLoggerMixin

included, #initialize, #terminate

Methods included from Fluent::PluginId

#initialize, #plugin_id, #plugin_id_configured?, #plugin_id_for_test?, #plugin_root_dir, #stop

Methods inherited from Base

#acquire_worker_lock, #after_shutdown, #after_shutdown?, #after_start, #after_started?, #before_shutdown, #before_shutdown?, #called_in_test?, #close, #closed?, #configured?, #context_router, #context_router=, #fluentd_worker_id, #get_lock_path, #has_router?, #initialize, #inspect, #plugin_root_dir, #reloadable_plugin?, #shutdown, #shutdown?, #started?, #stop, #stopped?, #string_safe_encoding, #terminate, #terminated?

Methods included from SystemConfig::Mixin

#system_config, #system_config_override

Methods included from Configurable

#config, #configure_proxy_generate, #configured_section_create, included, #initialize, lookup_type, register_type

Constructor Details

This class inherits a constructor from Fluent::Plugin::Input

Instance Method Details

#configure(conf) ⇒ Object



60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
# File 'lib/fluent/plugin/in_tcp.rb', line 60

def configure(conf)
  compat_parameters_convert(conf, :parser)
  parser_config = conf.elements('parse').first
  unless parser_config
    raise Fluent::ConfigError, "<parse> section is required."
  end
  super
  @_event_loop_blocking_timeout = @blocking_timeout
  @source_hostname_key ||= @source_host_key if @source_host_key

  @nodes = nil
  if @security
    @nodes = []
    @security.clients.each do |client|
      if client.host && client.network
        raise Fluent::ConfigError, "both of 'host' and 'network' are specified for client"
      end
      if !client.host && !client.network
        raise Fluent::ConfigError, "Either of 'host' and 'network' must be specified for client"
      end
      source = nil
      if client.host
        begin
          source = IPSocket.getaddress(client.host)
        rescue SocketError
          raise Fluent::ConfigError, "host '#{client.host}' cannot be resolved"
        end
      end
      source_addr = begin
                      IPAddr.new(source || client.network)
                    rescue ArgumentError
                      raise Fluent::ConfigError, "network '#{client.network}' address format is invalid"
                    end
      @nodes.push(source_addr)
    end
  end

  @parser = parser_create(conf: parser_config)
end

#multi_workers_ready?Boolean

Returns:

  • (Boolean)


100
101
102
# File 'lib/fluent/plugin/in_tcp.rb', line 100

def multi_workers_ready?
  true
end

#startObject



104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
# File 'lib/fluent/plugin/in_tcp.rb', line 104

def start
  super

  log.info "listening tcp socket", bind: @bind, port: @port
  del_size = @delimiter.length
  discard_till_next_delimiter = false
  if @_extract_enabled && @_extract_tag_key
    server_create(:in_tcp_server_single_emit, @port, bind: @bind, resolve_name: !!@source_hostname_key, send_keepalive_packet: @send_keepalive_packet) do |data, conn|
      unless check_client(conn)
        conn.close
        next
      end

      conn.buffer << data
      buf = conn.buffer
      pos = 0
      while i = buf.index(@delimiter, pos)
        msg = buf[pos...i]
        pos = i + del_size

        if discard_till_next_delimiter
          discard_till_next_delimiter = false
          next
        end

        if !@message_length_limit.nil? && @message_length_limit < msg.bytesize
          log.info "The received data is larger than 'message_length_limit', dropped:", limit: @message_length_limit, size: msg.bytesize, head: msg[...32]
          next
        end

        @parser.parse(msg) do |time, record|
          unless time && record
            log.warn "pattern not matched", message: msg
            next
          end

          tag = extract_tag_from_record(record)
          tag ||= @tag
          time ||= extract_time_from_record(record) || Fluent::EventTime.now
          record[@source_address_key] = conn.remote_addr if @source_address_key
          record[@source_hostname_key] = conn.remote_host if @source_hostname_key
          router.emit(tag, time, record)
        end
      end
      buf.slice!(0, pos) if pos > 0
      # If the buffer size exceeds the limit here, it means that the next message will definitely exceed the limit.
      # So we should clear the buffer here. Otherwise, it will keep storing useless data until the next delimiter comes.
      if !@message_length_limit.nil? && @message_length_limit < buf.bytesize
        log.info "The buffer size exceeds 'message_length_limit', cleared:", limit: @message_length_limit, size: buf.bytesize, head: buf[...32]
        buf.clear
        # We should discard the subsequent data until the next delimiter comes.
        discard_till_next_delimiter = true
        next
      end
    end
  else
    server_create(:in_tcp_server_batch_emit, @port, bind: @bind, resolve_name: !!@source_hostname_key, send_keepalive_packet: @send_keepalive_packet) do |data, conn|
      unless check_client(conn)
        conn.close
        next
      end

      conn.buffer << data
      buf = conn.buffer
      pos = 0
      es = Fluent::MultiEventStream.new
      while i = buf.index(@delimiter, pos)
        msg = buf[pos...i]
        pos = i + del_size

        if discard_till_next_delimiter
          discard_till_next_delimiter = false
          next
        end

        if !@message_length_limit.nil? && @message_length_limit < msg.bytesize
          log.info "The received data is larger than 'message_length_limit', dropped:", limit: @message_length_limit, size: msg.bytesize, head: msg[...32]
          next
        end

        @parser.parse(msg) do |time, record|
          unless time && record
            log.warn "pattern not matched", message: msg
            next
          end

          time ||= extract_time_from_record(record) || Fluent::EventTime.now
          record[@source_address_key] = conn.remote_addr if @source_address_key
          record[@source_hostname_key] = conn.remote_host if @source_hostname_key
          es.add(time, record)
        end
      end
      router.emit_stream(@tag, es)
      buf.slice!(0, pos) if pos > 0
      # If the buffer size exceeds the limit here, it means that the next message will definitely exceed the limit.
      # So we should clear the buffer here. Otherwise, it will keep storing useless data until the next delimiter comes.
      if !@message_length_limit.nil? && @message_length_limit < buf.bytesize
        log.info "The buffer size exceeds 'message_length_limit', cleared:", limit: @message_length_limit, size: buf.bytesize, head: buf[...32]
        buf.clear
        # We should discard the subsequent data until the next delimiter comes.
        discard_till_next_delimiter = true
        next
      end
    end
  end
end