Class: Fluent::Plugin::ProtobufHttpInput

Inherits:
Input
  • Object
show all
Defined in:
lib/fluent/plugin/in_protobuf_http.rb

Overview

Implementation of HTTP input plugin for Protobuf

Instance Method Summary collapse

Constructor Details

#initializeProtobufHttpInput

Returns a new instance of ProtobufHttpInput.



53
54
55
56
57
58
59
# File 'lib/fluent/plugin/in_protobuf_http.rb', line 53

def initialize
  super

  @protos = []            # list of *.proto files
  @compiled_protos = []   # list of compiled protos i.e. *_pb.rb files
  @msgclass_lookup = {}   # Lookup Hash: { msgtype => msgclass }
end

Instance Method Details

#compile_protosObject

Raises:

  • (Fluent::ConfigError)


61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
# File 'lib/fluent/plugin/in_protobuf_http.rb', line 61

def compile_protos
  log.debug("Checking proto_dir [#{@proto_dir}]...")

  path = File.expand_path(@proto_dir)
  raise Fluent::ConfigError, "protos_dir does not exist! [#{path}]" unless Dir.exist?(path)

  @protos = Dir["#{path}/*.proto"]
  raise Fluent::ConfigError, "Empty proto_dir! [#{path}]" unless @protos.any?

  log.info("Compiling .proto files [#{@protos.length}]...")

  `protoc --ruby_out=#{path} --proto_path=#{path} #{path}/*.proto`
  raise Fluent::ConfigError, 'Could not compile! See error(s) above.' unless $CHILD_STATUS.success?

  log.info("Compiled successfully:\n- #{@protos.join("\n- ")}")

  @protos.each do |proto|
    @compiled_protos.push(get_compiled_proto(proto))
  end

  log.info("Compiled .proto files:\n- #{@compiled_protos.join("\n- ")}")
end

#deserialize_msg(msgtype, serialized_msg) ⇒ Object



273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
# File 'lib/fluent/plugin/in_protobuf_http.rb', line 273

def deserialize_msg(msgtype, serialized_msg)
  msgclass = @msgclass_lookup[msgtype]
  log.debug("Deserializing {#{@in_mode}} message of type [#{msgclass}]...")
  begin
    case @in_mode
    when :binary
      msgclass.decode(serialized_msg)
    when :json
      msgclass.decode_json(serialized_msg)
    end
  rescue Google::Protobuf::ParseError => e
    log.error("Incompatible message! [msgtype: #{msgtype}, size: #{serialized_msg.length} bytes] #{e}")
    nil
  rescue StandardError => e
    log.error("Deserializaton failed! Error: #{e}")
    nil
  end
end

#get_compiled_proto(proto) ⇒ Object

Raises:

  • (Fluent::ConfigError)


84
85
86
87
88
89
90
91
92
# File 'lib/fluent/plugin/in_protobuf_http.rb', line 84

def get_compiled_proto(proto)
  proto_suffix = '.proto'
  compiled_proto_suffix = '_pb.rb'

  compiled_proto = proto.chomp(proto_suffix) + compiled_proto_suffix
  raise Fluent::ConfigError, "Compiled proto not found! [#{compiled_proto}]" unless File.file?(compiled_proto)

  compiled_proto
end

#get_msg_class(msg_type) ⇒ Object

Raises:

  • (Fluent::ConfigError)


135
136
137
138
139
140
# File 'lib/fluent/plugin/in_protobuf_http.rb', line 135

def get_msg_class(msg_type)
  msg = Google::Protobuf::DescriptorPool.generated_pool.lookup(msg_type)
  raise Fluent::ConfigError, "Message type ['#{msg_type}'] not registered!'" if msg.nil?

  msg.msgclass
end

#get_msg_types(compiled_proto) ⇒ Object



115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
# File 'lib/fluent/plugin/in_protobuf_http.rb', line 115

def get_msg_types(compiled_proto)
  log.debug("Extracting message types [#{compiled_proto}]...")
  msg_types = []
  File.foreach(compiled_proto) do |line|
    line.strip!
    if line.include?('::Google::Protobuf::DescriptorPool.generated_pool.lookup') && line.end_with?('.msgclass')
      extracted_msg_type = line[/"([^"]*)"/, 1].freeze
      msg_types.push(extracted_msg_type) unless extracted_msg_type.nil?
    end
  end

  if msg_types.any?
    log.info("Total [#{msg_types.length}] message types in [#{compiled_proto}]:\n- #{msg_types.join("\n- ")}")
  else
    log.warn("No message types found! [#{compiled_proto}]")
  end

  msg_types
end

#get_query_params(query_string) ⇒ Object



257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
# File 'lib/fluent/plugin/in_protobuf_http.rb', line 257

def get_query_params(query_string)
  if query_string.nil?
    log.warn("Empty query string! 'msgtype' is required!")
    return nil
  end

  query = WEBrick::HTTPUtils.parse_query(query_string)
  msgtype = query['msgtype']
  log.warn("'msgtype' not found in 'query_string' [#{query_string}]") if msgtype.nil?

  batch = query['batch']
  log.warn("'batch' not found in 'query_string' [#{query_string}]") if batch.nil?

  [msgtype, batch]
end

#populate_msgclass_lookupObject

Raises:

  • (Fluent::ConfigError)


94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
# File 'lib/fluent/plugin/in_protobuf_http.rb', line 94

def populate_msgclass_lookup
  @compiled_protos.each do |compiled_proto|
    msg_types = get_msg_types(compiled_proto)
    next unless msg_types.any?

    begin
      require compiled_proto
    rescue LoadError => e
      raise Fluent::ConfigError, "Possible 'import' issue! Use a single self-contianed .proto file! #{e}"
    end

    msg_types.each do |msg_type|
      @msgclass_lookup[msg_type] = get_msg_class(msg_type)
    end
  end

  raise Fluent::ConfigError, "No message types found! Check proto_dir [#{@proto_dir}]!" if @msgclass_lookup.empty?

  log.info("Registered messages [#{@msgclass_lookup.keys.length}]:\n- #{@msgclass_lookup.keys.join("\n- ")}")
end

#serialize_msg(msgtype, deserialized_msg) ⇒ Object



292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
# File 'lib/fluent/plugin/in_protobuf_http.rb', line 292

def serialize_msg(msgtype, deserialized_msg)
  msgclass = @msgclass_lookup[msgtype]
  log.debug("Serializing [#{@in_mode} > #{@out_mode}]...")
  begin
    case @out_mode
    when :binary
      msgclass.encode(deserialized_msg)
    when :json
      msgclass.encode_json(deserialized_msg)
    end
  rescue StandardError => e
    log.error("Serialization failed! [msgtype: #{msgtype}, msg: #{deserialized_msg}] Error: #{e}")
    nil
  end
end

#shutdownObject



308
309
310
311
312
313
314
# File 'lib/fluent/plugin/in_protobuf_http.rb', line 308

def shutdown
  @compiled_protos.each do |compiled_proto|
    File.delete(compiled_proto)
  end

  super
end

#startObject



142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
# File 'lib/fluent/plugin/in_protobuf_http.rb', line 142

def start
  super

  compile_protos
  populate_msgclass_lookup

  # TLS check
  proto = :tcp
  tls_opts = nil
  if @transport_config && @transport_config.protocol == :tls
    proto = :tls
    tls_opts = @transport_config.to_h
  end

  log.info("Starting protobuf #{proto == :tcp ? 'HTTP' : 'HTTPS'} server [#{@bind}:#{@port}]...")
  log.debug("TLS configuration:\n#{tls_opts}") if tls_opts

  http_server_create_http_server(:protobuf_server, addr: @bind, port: @port, logger: log, proto: proto, tls_opts: tls_opts) do |server|
    server.post("/#{tag}") do |req|
      peeraddr = "#{req.peeraddr[2]}:#{req.peeraddr[1]}" # ip:port
      serialized_msg = req.body

      log.info("[R] {#{@in_mode}} [#{peeraddr}, size: #{serialized_msg.length} bytes]")
      log.debug("Dumping serialized message [#{serialized_msg.length} bytes]:\n#{serialized_msg}")

      content_type = req.header['content-type'][0]

      unless valid_content_type?(content_type)
        status = "Invalid 'Content-Type' header! [#{content_type}]"
        log.warn("[X] Message rejected! [#{peeraddr}] #{status}")
        next [400, { 'Content-Type' => 'application/json', 'Connection' => 'close' }, { 'status' => status }.to_json]
      end

      log.debug("[>] Content-Type: #{content_type}")

      msgtype, batch = get_query_params(req.query_string)
      unless @msgclass_lookup.key?(msgtype)
        status = "Invalid 'msgtype' in 'query_string'! [#{msgtype}]"
        log.warn("[X] Message rejected! [#{peeraddr}] #{status}")
        next [400, { 'Content-Type' => 'application/json', 'Connection' => 'close' }, { 'status' => status }.to_json]
      end

      log.debug("[>] Query parameters: [msgtype: #{content_type}, batch: #{batch}]")

      deserialized_msg = deserialize_msg(msgtype, serialized_msg)

      if deserialized_msg.nil?
        status = "Incompatible message! [msgtype: #{msgtype}, size: #{serialized_msg.length} bytes]"
        log.warn("[X] Message rejected! [#{peeraddr}] #{status}")
        next [400, { 'Content-Type' => 'application/json', 'Connection' => 'close' }, { 'status' => status }.to_json]
      end

      is_batch = !batch.nil? && batch == 'true'
      log.debug("[>] Message validated! [msgtype: #{content_type}, is_batch: #{is_batch}]")

      # Log single message

      unless is_batch
        log.info("[S] {#{@in_mode}} [#{peeraddr}, msgtype: #{msgtype}, size: #{serialized_msg.length} bytes]")

        time = Fluent::Engine.now
        event_msg = serialize_msg(msgtype, deserialized_msg)
        record = { 'message' => event_msg }
        router.emit(@tag, time, record)

        log.info("[S] {#{@out_mode}} [#{peeraddr}, msgtype: #{msgtype}, size: #{event_msg.length} bytes]")
        next [200, { 'Content-Type' => 'text/plain' }, nil]
      end

      # Log batch messages

      log.info("[B] {#{@in_mode}} [#{peeraddr}, msgtype: #{msgtype}, size: #{serialized_msg.length} bytes]")

      if deserialized_msg.type.nil? || deserialized_msg.batch.nil? || deserialized_msg.batch.empty?
        status = "Invalid 'batch' message! [msgtype: #{msgtype}, size: #{serialized_msg.length} bytes]"
        log.warn("[X] Message rejected! [#{peeraddr}] #{status}")
        next [400, { 'Content-Type' => 'application/json', 'Connection' => 'close' }, { 'status' => status }.to_json]
      end

      batch_type = deserialized_msg.type
      batch_msgs = deserialized_msg.batch
      batch_size = batch_msgs.length

      log.info("[B] Emitting message stream/batch [batch_size: #{batch_size} messages]...")

      stream = MultiEventStream.new
      batch_msgs.each do |batch_msg|
        time = Fluent::Engine.now
        record = { 'message' => serialize_msg(batch_type, batch_msg) }
        stream.add(time, record)
      end

      router.emit_stream(@tag, stream)

      status = "Batch received! [batch_type: #{batch_type}, batch_size: #{batch_size} messages]"
      log.info("[B] {#{@out_mode}} [#{peeraddr}, msgtype: #{msgtype}] #{status}")
      [200, { 'Content-Type' => 'application/json', 'Connection' => 'close' }, { 'status' => status }.to_json]
    end
  end
end

#valid_content_type?(content_type) ⇒ Boolean

Returns:

  • (Boolean)


243
244
245
246
247
248
249
250
251
252
253
254
255
# File 'lib/fluent/plugin/in_protobuf_http.rb', line 243

def valid_content_type?(content_type)
  hdr_binary = 'application/octet-stream'
  hdr_json = 'application/json'

  case @in_mode
  when :binary
    content_type == hdr_binary
  when :json
    content_type == hdr_json
  when :binary_and_json
    content_type == hdr_binary || content_type == hdr_json
  end
end