Class: Fluent::DatahubOutput

Inherits:
BufferedOutput
  • Object
show all
Defined in:
lib/fluent/plugin/out_datahub.rb

Constant Summary collapse

@@file_lock =

写文件锁

Mutex.new

Instance Attribute Summary collapse

Instance Method Summary collapse

Instance Attribute Details

#shard_cursorObject

该值内部使用,不提供配置分发shard的游标



64
65
66
# File 'lib/fluent/plugin/out_datahub.rb', line 64

def shard_cursor
  @shard_cursor
end

Instance Method Details

#check_paramsObject



90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
# File 'lib/fluent/plugin/out_datahub.rb', line 90

def check_params
  schema = @datahub_topic.record_schema
  if @data_encoding != nil
      schema.setEncoding(@data_encoding)
  end

  fields = schema.get_fields
  
  # 保证用户配置的字段在topic中存在

  if @column_names.size > 0
      for i in 0...@column_names.size do
          column_name = @column_names[i]
          column_index = find_column_index(fields, column_name)
          if column_index == -1
              @logger.error "Column: " + column_name + " not found, please check your config"
              raise "Column: " + column_name + " not found, please check your config"
          end
      end
  end

  if @source_keys.size == 0
      @source_keys = @column_names
  end

  #puts "source_key size: " + @source_keys.to_s

  #puts "column_names: " + @column_names.to_s


  if @source_keys.size > 0 and @column_names.size != @source_keys.size
      @logger.error "source_keys's size must be equal to column_names's size, please check your config"
      raise "source_keys's size must be equal to column_names's size, please check your config"
  else
      for i in 0...@column_names.size do
          @target_source_column_map[@column_names[i]] = @source_keys[i]
      end
  end

  #puts @target_source_column_map


  if @shard_count < 1
      raise "there must be at least 1 active shard!"
  end
  
  # 配置了脏数据继续,必须指定脏数据文件

  if @dirty_data_continue
      if @dirty_data_file.to_s.chomp.length == 0
          raise "Dirty data file path can not be empty"
      end
  end

  # 检查shard_keys字段是否合法

  if @shard_keys.size > 0
      for i in 0...@shard_keys.size
          shard_key = @shard_keys[i]
          shard_key_index = find_column_index(fields, shard_key)
          if shard_key_index == -1
              @logger.error "Shard key: " + shard_key + " not found in schema, please check your config"
              raise "Shard key: " + shard_key + " not found in schema, please check your config"
          end
      end
  end
    
end

#configure(conf) ⇒ Object



69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
# File 'lib/fluent/plugin/out_datahub.rb', line 69

def configure(conf)
    super
    @client = DatahubClient.new(@endpoint, @access_id, @access_key)
    @datahub_project = @client.get_project(@project_name)
    @datahub_topic = @datahub_project.get_topic(@topic_name)

    @shards = get_active_shard
    @shard_count = @shards.size

    @logger = log
    @shard_cursor = 0

    #限制一次向datahub put data不能超过3000

    @put_data_max_size = 3000

    @target_source_column_map = {}
    
    # 前置校验参数

    check_params
end

#find_column_index(fields, column_name) ⇒ Object

在topic的schema中查找某列的真实下标如果没找到返回-1



155
156
157
158
159
160
161
162
163
# File 'lib/fluent/plugin/out_datahub.rb', line 155

def find_column_index(fields, column_name)
    for i in 0...fields.size do
        name = fields[i].get_name
        if name == column_name
            return i
        end
    end
    return -1
end

#format(tag, time, record) ⇒ Object



173
174
175
# File 'lib/fluent/plugin/out_datahub.rb', line 173

def format(tag, time, record)
    [tag, time, record].to_json + '\n' 
end

#get_active_shardObject

获取active状态的shard



389
390
391
392
393
394
395
396
397
398
399
# File 'lib/fluent/plugin/out_datahub.rb', line 389

def get_active_shard
    all_shards = @datahub_topic.list_shards
    active_shards = []
    all_shards.each do |shard|
        if shard.state == "ACTIVE"
            active_shards.push(shard)
        end
    end

    return active_shards
end

#get_shard_id(record) ⇒ Object

产生写入的shard_id



354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
# File 'lib/fluent/plugin/out_datahub.rb', line 354

def get_shard_id(record)
    if @shard_id != nil and !@shard_id.empty?
        return @shard_id
    elsif @shard_keys != nil and @shard_keys.size > 0
        #hash 写入

        hash_string = ""
        for i in 0...@shard_keys.size
            shard_key = @shard_keys[i]
            source_key = @target_source_column_map[shard_key]
            if record[source_key] != nil
                hash_string += record[source_key].to_s + ","
            end
        end
        hashed_value = hash_code(hash_string)
        index = hashed_value % @shard_count
        return @shards[index].shard_id
    else
        #轮询写入

        idx = @shard_cursor % @shard_count
        @shard_cursor = idx + 1
        shard_id = @shards[idx].shard_id
        # puts "idx: " + idx.to_s

        # puts "shard_id: " + shard_id.to_s

        return shard_id
    end
end

#hash_code(str) ⇒ Object

产生和java 一样的hashcode



382
383
384
385
386
# File 'lib/fluent/plugin/out_datahub.rb', line 382

def hash_code(str)
    str.each_char.reduce(0) do |result, char|
        [((result << 5) - result) + char.ord].pack('L').unpack('l').first
    end
end

#record_to_entity(entity, record) ⇒ Object

将record转化为entity



292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
# File 'lib/fluent/plugin/out_datahub.rb', line 292

def record_to_entity(entity, record)
    schema = entity.get_schema
    @column_names.each do |column|
        begin
            source_key = @target_source_column_map[column]
            if record.has_key?(source_key)
                field = schema.get_field(column)
                if field == nil
                    raise "Unknown column name of data"
                else
                    field_type = field.get_type
                    if field_type == "BIGINT"
                        entity.setBigInt(column, record[source_key])
                    elsif field_type == "DOUBLE"
                        entity.setDouble(column, record[source_key])
                    elsif field_type == "BOOLEAN"
                        entity.setBoolean(column, record[source_key])
                    elsif field_type == "STRING"
                        entity.setString(column, record[source_key])
                    elsif field_type == "TIMESTAMP"
                        entity.setTimeStamp(column, record[source_key])
                    else
                        raise "Unknown schema type of data"
                    end
                end
            end
        rescue => e
            @logger.error "Parse data: " + column + "[" + record[source_key].to_s + "] failed, " + e.message
            if !@dirty_data_continue
                @logger.error "Dirty data found, exit process now."
                puts "Dirty data found, exit process now."
                raise "try to exit!"
            else
                # 忽略的异常数据直接落文件

                write_as_dirty_data(record)
            end
            return false
        end
    end
    return true
end

#shutdownObject



169
170
171
# File 'lib/fluent/plugin/out_datahub.rb', line 169

def shutdown
    super
end

#startObject



165
166
167
# File 'lib/fluent/plugin/out_datahub.rb', line 165

def start
    super
end

#write(chunk) ⇒ Object



181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
# File 'lib/fluent/plugin/out_datahub.rb', line 181

def write(chunk)
    record_entities = []
    schema = @datahub_topic.record_schema

    chunk.msgpack_each do |tag, time, record|
        entity = RecordEntity.new(schema)
        convert_success = record_to_entity(entity, record)
        entity.set_shard_id(get_shard_id(record))
        if convert_success
            record_entities.push(entity)
        end
        if record_entities.size >= @put_data_max_size
            write_data_with_retry(record_entities)
            # puts record_entities.to_json

            record_entities.clear
            # puts "after clear ; " + record_entities.to_json

        elsif record_entities.size >= @put_data_batch_size
            write_data_with_retry(record_entities)
            #puts record_entities.to_json

            record_entities.clear
            #puts "after clear ; " + record_entities.to_json

        end
    end

    if record_entities.size > 0
        write_data_with_retry(record_entities)
        # record_entities.clear

    end
end

#write_as_dirty_data(record) ⇒ Object

脏数据文件处理



336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
# File 'lib/fluent/plugin/out_datahub.rb', line 336

def write_as_dirty_data(record)
    dirty_file_part1_name = @dirty_data_file + ".part1"
    dirty_file_part2_name = @dirty_data_file + ".part2"

    # todo 加锁写入

    @@file_lock.synchronize {
        dirty_file_part2 = File.open(dirty_file_part2_name, "a+")
        dirty_file_part2.puts(record.to_json)
        dirty_file_part2.close
        if File.size(dirty_file_part2_name) > @dirty_data_file_max_size / 2
            # .part1, .part2分别存储数据

            # 旧数据落part1,新的数据落part2

            FileUtils.mv(dirty_file_part2_name, dirty_file_part1_name)
        end
    }
end

#write_data_with_retry(record_entities) ⇒ Object

根据@@retry_times 重试写入datahub数据



212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
# File 'lib/fluent/plugin/out_datahub.rb', line 212

def write_data_with_retry(record_entities)
    tmp_retry_times = @retry_times
    put_result = nil
    while true
        begin
            put_result = @datahub_topic.write_data(record_entities)
        rescue => e
            @logger.warn "Put " + record_entities.size.to_s + " records to datahub failed, total " + record_entities.size.to_s + ", message = " + e.message
            if tmp_retry_times > 0
                sleep @retry_interval
                @logger.warn "Now retry(" + (@retry_times - tmp_retry_times + 1).to_s + ")..."
                tmp_retry_times -= 1
                next
            else
                if !@dirty_data_continue
                    @logger.error "Dirty data found, exit process now."
                    puts "Dirty data found, exit process now."
                    raise "try to exit!"
                else
                    #不重试/重试次数用完,写入脏数据文件

                    for i in 0...record_entities.size
                        record_entity = record_entities[i]
                        @logger.error "Put record: " + record_entity.get_columns_map.to_s + " failed, " + put_result.failed_record_error[i].to_s
                        write_as_dirty_data(record_entity.get_columns_map)
                    end
                    break
                end
            end
        end

        #puts record_entities.to_json

        if put_result != nil and put_result.failed_record_count > 0
            if tmp_retry_times > 0
                #按照retry_times重试

                @logger.warn "Put " + put_result.failed_record_count.to_s + " records to datahub failed, total " + record_entities.size.to_s
                sleep @retry_interval
                @logger.warn "Now retry(" + (@retry_times - tmp_retry_times + 1).to_s + ")..."
                tmp_retry_times -= 1
                record_entities = put_result.failed_record_list

                # 若是轮询写入方式,且shard处于非active状态(即error_code = "InvalidShardOperation"),则刷新shard列表

                fresh_shard_flag = false
                if @shard_id.empty? and @shard_keys.size == 0
                    for i in 0...put_result.failed_record_count
                        error_entity = put_result.failed_record_error[i]
                        if error_entity["error_code"] == "InvalidShardOperation"
                            unless fresh_shard_flag
                                @shards = get_active_shard
                                @shard_count = @shards.size
                                fresh_shard_flag = true
                            end
                            # puts "before: " + record_entities[i].to_json

                            record_entities[i].set_shard_id(get_shard_id(record_entities[i]))
                            # puts record_entities[i].to_json

                        end
                    end
                end
            else
                if !@dirty_data_continue
                    @logger.error "Dirty data found, exit process now."
                    puts "Dirty data found, exit process now."
                    raise "try to exit!"
                else
                    #不重试/重试次数用完,写入脏数据文件

                    for i in 0...put_result.failed_record_count
                        record_entity = put_result.failed_record_list[i]
                        @logger.error "Put record: " + record_entity.get_columns_map.to_s + " failed, " + put_result.failed_record_error[i].to_s
                        write_as_dirty_data(record_entity.get_columns_map)
                    end
                    break
                end
            end
        else
            @logger.info "Put data to datahub success, total " + record_entities.size.to_s
            break
        end
    end
end