Class: Fluent::ODPSOutput::TableElement

Inherits:
Object
  • Object
show all
Includes:
Configurable
Defined in:
lib/fluent/plugin/out_aliyun_odps.rb

Overview

TODO: Merge SQLInput’s TableElement

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(pattern, log) ⇒ TableElement

Returns a new instance of TableElement.



62
63
64
65
66
67
# File 'lib/fluent/plugin/out_aliyun_odps.rb', line 62

def initialize(pattern, log)
  super()
  @pattern = MatchPattern.create(pattern)
  @log = log
  @writer = Array.new
end

Instance Attribute Details

#clientObject (readonly)

Returns the value of attribute client.



57
58
59
# File 'lib/fluent/plugin/out_aliyun_odps.rb', line 57

def client
  @client
end

#logObject (readonly)

Returns the value of attribute log.



60
61
62
# File 'lib/fluent/plugin/out_aliyun_odps.rb', line 60

def log
  @log
end

#partitionListObject

Returns the value of attribute partitionList.



56
57
58
# File 'lib/fluent/plugin/out_aliyun_odps.rb', line 56

def partitionList
  @partitionList
end

#patternObject (readonly)

Returns the value of attribute pattern.



59
60
61
# File 'lib/fluent/plugin/out_aliyun_odps.rb', line 59

def pattern
  @pattern
end

#writerObject (readonly)

Returns the value of attribute writer.



58
59
60
# File 'lib/fluent/plugin/out_aliyun_odps.rb', line 58

def writer
  @writer
end

Instance Method Details

#closeObject



260
261
262
# File 'lib/fluent/plugin/out_aliyun_odps.rb', line 260

def close()
  #@client.loadShard(0)

end

#configure(conf) ⇒ Object

初始化数据



70
71
72
73
74
75
76
77
78
79
80
81
82
# File 'lib/fluent/plugin/out_aliyun_odps.rb', line 70

def configure(conf)
  super
  @format_proc = Proc.new { |record|
    values = []
    @fields.split(',').each { |key|
      unless record.has_key?(key)
        @log.warn "the table  "+@table+"'s "+key+" field not has match key"
      end
      values << record[key]
    }
    values
  }
end

#import(chunk) ⇒ Object

import data



115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
# File 'lib/fluent/plugin/out_aliyun_odps.rb', line 115

def import(chunk)
  records = []
  partitions=Hash.new
  chunk.msgpack_each { |tag, time, data|
    begin
      #if partition is not empty

      unless @partition.blank? then
        #if partition has params in it

        if @partition.include? "=${"
          #split partition

          partition_arrays=@partition.split(',')
          partition_name=''
          i=1
          for p in partition_arrays do
            #if partition is time formated

            if p.include? "strftime"
              key=p[p.index("{")+1, p.index(".strftime")-1-p.index("{")]
              partition_column=p[0, p.index("=")]
              timeFormat=p[p.index("(")+2, p.index(")")-3-p.index("(")]
              if data.has_key?(key)
                if time_format == nil
                  partition_value=Time.parse(data[key]).strftime(timeFormat)
                else
                  partition_value=Time.strptime(data[key], time_format).strftime(timeFormat)
                end
                if i==1
                  partition_name+=partition_column+"="+partition_value
                else
                  partition_name+=","+partition_column+"="+partition_value
                end
              else
                raise "partition has no corresponding source key or the partition expression is wrong,"+data
              end
            else
              key=p[p.index("{")+1, p.index("}")-1-p.index("{")]
              partition_column=p[0, p.index("=")]
              if data.has_key?(key)
                partition_value=data[key]
                if i==1
                  partition_name+=partition_column+"="+partition_value
                else
                  partition_name+=","+partition_column+"="+partition_value
                end
              else
                raise "partition has no corresponding source key or the partition expression is wrong,"+data
              end
            end
            i+=1
          end
        else
          partition_name=@partition
        end
        if partitions[partition_name]==nil
          partitions[partition_name]=[]
        end
        partitions[partition_name] << @format_proc.call(data)

      else
        records << @format_proc.call(data)
      end

    rescue => e
      raise "Failed to format the data:"+e.message
    end
  }

  begin
    #multi thread

    sendThread = Array.new
    unless @partition.blank? then
      partitions.each { |k, v|
        @log.info k
        #if the partition is not exist, create one

        unless @partitionList.include?(k)
          @client.addPartition(k)
          @partitionList << k
          @log.info "add partition "+k
        end
      }
      for thread in 0..@thread_number-1
        sendThread[thread] = Thread.start(thread) do |threadId|
          retryTime = 0
          begin
            partitions.each { |k, v|
              sendCount = v.size/@thread_number
              restCount = 0
              if threadId == @thread_number-1
                restCount = v.size%@thread_number
              end
              @writer[threadId].write(v[sendCount*threadId..sendCount*(threadId+1)+restCount-1], k)
              @log.info "Successfully  import "+(sendCount+restCount).to_s+" data to partition:"+k+",table:"+@table+" at threadId:"+threadId.to_s
            }
          rescue => e
            if retryTime > 0
              @log.info "Fail to write, retry in 2sec. Error at threadId:"+threadId.to_s+" Msg:"+e.message
              sleep(2)
              retryTime -= 1
              retry
            else
              raise e
            end
          end
        end
      end
    else
      @log.info records.size.to_s+" records to be sent"
      for thread in 0..@thread_number-1
        sendThread[thread] = Thread.start(thread) do |threadId|
          retryTime = 0
          #send data from sendCount*threadId to sendCount*(threadId+1)-1

          sendCount = records.size/@thread_number
          restCount = 0
          if threadId == @thread_number-1
            restCount = records.size%@thread_number
          end
          begin
            @writer[threadId].write(records[sendCount*threadId..sendCount*(threadId+1)+restCount-1])
            @log.info "Successfully import "+(sendCount+restCount).to_s+" data to table:"+@table+" at threadId:"+threadId.to_s
          rescue => e
            if retryTime > 0
              @log.info "Fail to write, retry in 2sec. Error at threadId:"+threadId.to_s+" Msg:"+e.message
              sleep(2)
              retryTime -= 1
              retry
            else
              raise e 
            end
          end
        end
      end
    end
    for thread in 0..@thread_number-1
      sendThread[thread].join
    end
  rescue => e
    # reload shard

    if e.message.include? "ShardNotReady" or e.message.include? "InvalidShardId"
      @client.loadShard(@shard_number)
      @client.waitForShardLoad
    end
    # ignore other exceptions to use Fluentd retry

    raise "write records failed,"+e.message
  end
end

#init(config) ⇒ Object



84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
# File 'lib/fluent/plugin/out_aliyun_odps.rb', line 84

def init(config)
  odpsConfig = OdpsDatahub::OdpsConfig.new(config[:aliyun_access_id],
                                           config[:aliyun_access_key],
                                           config[:aliyun_odps_endpoint],
                                           config[:aliyun_odps_hub_endpoint],
                                           config[:project])
  if @shard_number<=0
    raise "shard number must more than 0"
  end
  begin
    @client = OdpsDatahub::StreamClient.new(odpsConfig, config[:project], @table)
    @client.loadShard(@shard_number)
    @client.waitForShardLoad
    for i in 0..@thread_number-1
      @writer[i] = @client.createStreamArrayWriter()
    end
    partitionMaps=@client.getPartitionList
    @partitionList=[]
    for map in partitionMaps do
      partitionName=''
      map.each { |k, v|
        partitionName+=k+"="+v+","
      }
      @partitionList<<partitionName.chomp(",")
    end
  rescue => e
    raise "loadShard failed,"+e.message
  end
end