Class: OdpsDatahub::StreamWriter

Inherits:
Object
  • Object
show all
Defined in:
lib/fluent/plugin/stream_writer.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(odpsConfig, project, table, path, shardId = nil, odpsSchema = nil) ⇒ StreamWriter

Returns a new instance of StreamWriter.



33
34
35
36
37
38
39
40
41
# File 'lib/fluent/plugin/stream_writer.rb', line 33

def initialize(odpsConfig, project, table, path, shardId = nil, odpsSchema = nil)
  @mOdpsConfig = odpsConfig
  @mProject = project
  @mTable = table
  @mPath = path
  @mShardId = shardId
  @mSchema = odpsSchema
  reload
end

Instance Attribute Details

#mOdpsConfigObject (readonly)

Returns the value of attribute mOdpsConfig.



32
33
34
# File 'lib/fluent/plugin/stream_writer.rb', line 32

def mOdpsConfig
  @mOdpsConfig
end

#mPathObject (readonly)

Returns the value of attribute mPath.



32
33
34
# File 'lib/fluent/plugin/stream_writer.rb', line 32

def mPath
  @mPath
end

#mProjectObject (readonly)

Returns the value of attribute mProject.



32
33
34
# File 'lib/fluent/plugin/stream_writer.rb', line 32

def mProject
  @mProject
end

#mRecordListObject (readonly)

Returns the value of attribute mRecordList.



32
33
34
# File 'lib/fluent/plugin/stream_writer.rb', line 32

def mRecordList
  @mRecordList
end

#mShardIdObject (readonly)

Returns the value of attribute mShardId.



32
33
34
# File 'lib/fluent/plugin/stream_writer.rb', line 32

def mShardId
  @mShardId
end

#mTableObject (readonly)

Returns the value of attribute mTable.



32
33
34
# File 'lib/fluent/plugin/stream_writer.rb', line 32

def mTable
  @mTable
end

#mUpStreamObject (readonly)

Returns the value of attribute mUpStream.



32
33
34
# File 'lib/fluent/plugin/stream_writer.rb', line 32

def mUpStream
  @mUpStream
end

Instance Method Details

#reloadObject



43
44
45
46
47
# File 'lib/fluent/plugin/stream_writer.rb', line 43

def reload
  @mUpStream = ::StringIO.new
  @mRecordList = Array.new
  @mUpStream.set_encoding(::Protobuf::Field::BytesField::BYTES_ENCODING)
end

#write(recordList, partition = "") ⇒ Object



49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
# File 'lib/fluent/plugin/stream_writer.rb', line 49

def write(recordList, partition = "")
  if recordList.is_a?Array
    recordList.each{ |value|
      #handle RecordList
      if value.is_a?OdpsTableRecord
        @mRecordList.push(value)
      #handle ArrayList
      elsif value.is_a?Array and @mSchema != nil and value.size == @mSchema.getColumnCount
        record = convert2Record(value)
        @mRecordList.push(record)
      else
        raise OdpsDatahubException.new($INVALID_ARGUMENT, "write an error type")
      end
    }
  else
    raise OdpsDatahubException.new($INVALID_ARGUMENT, "write param must be a array")
  end

  serializer = Serializer.new
  serializer.serialize(@mUpStream, @mRecordList)

  if @mUpStream.length == 0
    raise OdpsDatahubException.new($INVALID_ARGUMENT, "mRecordList is empty")
  end
  header = Hash.new
  param = Hash.new
  param[$PARAM_CURR_PROJECT] = @mProject
  #TODO partition format
  param[$PARAM_PARTITION] = partition
  param[$PARAM_RECORD_COUNT] = @mRecordList.size.to_s
  header[$CONTENT_ENCODING] = "deflate"
  header[$CONTENT_TYPE] = "application/octet-stream"
  # version 4
  pack = OdpsDatahub::XStreamPack.new
  pack.pack_data = Zlib::Deflate.deflate(@mUpStream.string)
  pack.pack_meta = ""
  upStream = ::StringIO.new
  pack.serialize_to(upStream)
  header[$CONTENT_MD5] = Digest::MD5.hexdigest(upStream.string)
  header[$CONTENT_LENGTH] = upStream.string.length.to_s

=begin version 3
  upStream = Zlib::Deflate.deflate(@mUpStream.string)
  header[$CONTENT_MD5] = Digest::MD5.hexdigest(upStream)
  header[$CONTENT_LENGTH] = upStream.length.to_s
=end
  #MAX_LENGTH 2048*10KB
  if upStream.length > $MAX_PACK_SIZE
    raise OdpsDatahubException.new($PACK_SIZE_EXCEED, "pack size:" + upStream.string.length.to_s)
  end
  if @mShardId != nil
    conn = HttpConnection.new(@mOdpsConfig, header, param, @mPath + "/shards/" + @mShardId.to_s, "PUT", upStream.string)
  else
    conn = HttpConnection.new(@mOdpsConfig, header, param, @mPath + "/shards", "PUT", upStream.string)
  end

  reload
  res = conn.getResponse
  json_obj = JSON.parse(res.body)
  if res.code != "200"
    raise OdpsDatahubException.new(json_obj["Code"], "write failed because " + json_obj["Message"])
  end
  return json_obj
end