Class: OdpsDatahub::StreamWriter
- Inherits:
-
Object
- Object
- OdpsDatahub::StreamWriter
- Defined in:
- lib/fluent/plugin/stream_writer.rb
Instance Attribute Summary collapse
-
#mOdpsConfig ⇒ Object
readonly
Returns the value of attribute mOdpsConfig.
-
#mPath ⇒ Object
readonly
Returns the value of attribute mPath.
-
#mProject ⇒ Object
readonly
Returns the value of attribute mProject.
-
#mRecordList ⇒ Object
readonly
Returns the value of attribute mRecordList.
-
#mShardId ⇒ Object
readonly
Returns the value of attribute mShardId.
-
#mTable ⇒ Object
readonly
Returns the value of attribute mTable.
-
#mUpStream ⇒ Object
readonly
Returns the value of attribute mUpStream.
Instance Method Summary collapse
-
#initialize(odpsConfig, project, table, path, shardId = nil, odpsSchema = nil) ⇒ StreamWriter
constructor
A new instance of StreamWriter.
- #reload ⇒ Object
- #write(recordList, partition = "") ⇒ Object
Constructor Details
#initialize(odpsConfig, project, table, path, shardId = nil, odpsSchema = nil) ⇒ StreamWriter
Returns a new instance of StreamWriter.
33 34 35 36 37 38 39 40 41 |
# File 'lib/fluent/plugin/stream_writer.rb', line 33 def initialize(odpsConfig, project, table, path, shardId = nil, odpsSchema = nil) @mOdpsConfig = odpsConfig @mProject = project @mTable = table @mPath = path @mShardId = shardId @mSchema = odpsSchema reload end |
Instance Attribute Details
#mOdpsConfig ⇒ Object (readonly)
Returns the value of attribute mOdpsConfig.
32 33 34 |
# File 'lib/fluent/plugin/stream_writer.rb', line 32 def mOdpsConfig @mOdpsConfig end |
#mPath ⇒ Object (readonly)
Returns the value of attribute mPath.
32 33 34 |
# File 'lib/fluent/plugin/stream_writer.rb', line 32 def mPath @mPath end |
#mProject ⇒ Object (readonly)
Returns the value of attribute mProject.
32 33 34 |
# File 'lib/fluent/plugin/stream_writer.rb', line 32 def mProject @mProject end |
#mRecordList ⇒ Object (readonly)
Returns the value of attribute mRecordList.
32 33 34 |
# File 'lib/fluent/plugin/stream_writer.rb', line 32 def mRecordList @mRecordList end |
#mShardId ⇒ Object (readonly)
Returns the value of attribute mShardId.
32 33 34 |
# File 'lib/fluent/plugin/stream_writer.rb', line 32 def mShardId @mShardId end |
#mTable ⇒ Object (readonly)
Returns the value of attribute mTable.
32 33 34 |
# File 'lib/fluent/plugin/stream_writer.rb', line 32 def mTable @mTable end |
#mUpStream ⇒ Object (readonly)
Returns the value of attribute mUpStream.
32 33 34 |
# File 'lib/fluent/plugin/stream_writer.rb', line 32 def mUpStream @mUpStream end |
Instance Method Details
#reload ⇒ Object
43 44 45 46 47 |
# File 'lib/fluent/plugin/stream_writer.rb', line 43 def reload @mUpStream = ::StringIO.new @mRecordList = Array.new @mUpStream.set_encoding(::Protobuf::Field::BytesField::BYTES_ENCODING) end |
#write(recordList, partition = "") ⇒ Object
49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 |
# File 'lib/fluent/plugin/stream_writer.rb', line 49 def write(recordList, partition = "") if recordList.is_a?Array recordList.each{ |value| #handle RecordList if value.is_a?OdpsTableRecord @mRecordList.push(value) #handle ArrayList elsif value.is_a?Array and @mSchema != nil and value.size == @mSchema.getColumnCount record = convert2Record(value) @mRecordList.push(record) else raise OdpsDatahubException.new($INVALID_ARGUMENT, "write an error type") end } else raise OdpsDatahubException.new($INVALID_ARGUMENT, "write param must be a array") end serializer = Serializer.new serializer.serialize(@mUpStream, @mRecordList) if @mUpStream.length == 0 raise OdpsDatahubException.new($INVALID_ARGUMENT, "mRecordList is empty") end header = Hash.new param = Hash.new param[$PARAM_CURR_PROJECT] = @mProject #TODO partition format param[$PARAM_PARTITION] = partition param[$PARAM_RECORD_COUNT] = @mRecordList.size.to_s header[$CONTENT_ENCODING] = "deflate" header[$CONTENT_TYPE] = "application/octet-stream" # version 4 pack = OdpsDatahub::XStreamPack.new pack.pack_data = Zlib::Deflate.deflate(@mUpStream.string) pack. = "" upStream = ::StringIO.new pack.serialize_to(upStream) header[$CONTENT_MD5] = Digest::MD5.hexdigest(upStream.string) header[$CONTENT_LENGTH] = upStream.string.length.to_s =begin version 3 upStream = Zlib::Deflate.deflate(@mUpStream.string) header[$CONTENT_MD5] = Digest::MD5.hexdigest(upStream) header[$CONTENT_LENGTH] = upStream.length.to_s =end #MAX_LENGTH 2048*10KB if upStream.length > $MAX_PACK_SIZE raise OdpsDatahubException.new($PACK_SIZE_EXCEED, "pack size:" + upStream.string.length.to_s) end if @mShardId != nil conn = HttpConnection.new(@mOdpsConfig, header, param, @mPath + "/shards/" + @mShardId.to_s, "PUT", upStream.string) else conn = HttpConnection.new(@mOdpsConfig, header, param, @mPath + "/shards", "PUT", upStream.string) end reload res = conn.getResponse json_obj = JSON.parse(res.body) if res.code != "200" raise OdpsDatahubException.new(json_obj["Code"], "write failed because " + json_obj["Message"]) end return json_obj end |