Class: OdpsDatahub::StreamClient
- Inherits:
-
Object
- Object
- OdpsDatahub::StreamClient
- Defined in:
- lib/fluent/plugin/stream_client.rb
Instance Attribute Summary collapse
-
#mOdpsConfig ⇒ Object
readonly
Returns the value of attribute mOdpsConfig.
-
#mOdpsTable ⇒ Object
readonly
Returns the value of attribute mOdpsTable.
-
#mOdpsTableSchema ⇒ Object
readonly
Returns the value of attribute mOdpsTableSchema.
-
#mProject ⇒ Object
readonly
Returns the value of attribute mProject.
-
#mTable ⇒ Object
readonly
Returns the value of attribute mTable.
Instance Method Summary collapse
-
#addPartition(ptStr) ⇒ Object
ptStr ex: ‘dt=20150805,hh=08,mm=24’ call add partiton if not exsits.
- #createStreamArrayWriter(shardId = nil) ⇒ Object
- #createStreamWriter(shardId = nil) ⇒ Object
- #getOdpsTableSchema ⇒ Object
-
#getPartitionList ⇒ Object
get partitions and return an array like :[“place”=>“china2”,“place”=>“china”].
-
#getShardStatus ⇒ Object
return json like [“0”,“State”: “loaded”,“1”,“State”: “loaded”].
-
#initialize(odpsConfig, project, table) ⇒ StreamClient
constructor
A new instance of StreamClient.
- #loadShard(idx) ⇒ Object
-
#waitForShardLoad(timeOut = 120) ⇒ Object
seconds.
Constructor Details
#initialize(odpsConfig, project, table) ⇒ StreamClient
Returns a new instance of StreamClient.
30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 |
# File 'lib/fluent/plugin/stream_client.rb', line 30 def initialize(odpsConfig, project, table) @mOdpsConfig = odpsConfig @mProject = project @mTable = table @mShards = Array.new if @mProject == nil or @mProject == "" @mProject = @mOdpsConfig.defaultProjectName end @mOdpsTable = OdpsTable.new(@mOdpsConfig, @mProject, @mTable) header = Hash.new param = Hash.new param[$PARAM_QUERY] = "meta" conn = HttpConnection.new(@mOdpsConfig, header, param, getResource, "GET") res = conn.getResponse jsonTableMeta = JSON.parse(res.body) if res.code != "200" raise OdpsDatahubException.new(jsonTableMeta["Code"], "initialize failed because " + jsonTableMeta["Message"]) end @mOdpsTableSchema = OdpsTableSchema.new(jsonTableMeta["Schema"]) end |
Instance Attribute Details
#mOdpsConfig ⇒ Object (readonly)
Returns the value of attribute mOdpsConfig.
29 30 31 |
# File 'lib/fluent/plugin/stream_client.rb', line 29 def mOdpsConfig @mOdpsConfig end |
#mOdpsTable ⇒ Object (readonly)
Returns the value of attribute mOdpsTable.
29 30 31 |
# File 'lib/fluent/plugin/stream_client.rb', line 29 def mOdpsTable @mOdpsTable end |
#mOdpsTableSchema ⇒ Object (readonly)
Returns the value of attribute mOdpsTableSchema.
29 30 31 |
# File 'lib/fluent/plugin/stream_client.rb', line 29 def mOdpsTableSchema @mOdpsTableSchema end |
#mProject ⇒ Object (readonly)
Returns the value of attribute mProject.
29 30 31 |
# File 'lib/fluent/plugin/stream_client.rb', line 29 def mProject @mProject end |
#mTable ⇒ Object (readonly)
Returns the value of attribute mTable.
29 30 31 |
# File 'lib/fluent/plugin/stream_client.rb', line 29 def mTable @mTable end |
Instance Method Details
#addPartition(ptStr) ⇒ Object
ptStr ex: ‘dt=20150805,hh=08,mm=24’ call add partiton if not exsits
58 59 60 |
# File 'lib/fluent/plugin/stream_client.rb', line 58 def addPartition(ptStr) @mOdpsTable.addPartition(ptStr) end |
#createStreamArrayWriter(shardId = nil) ⇒ Object
70 71 72 |
# File 'lib/fluent/plugin/stream_client.rb', line 70 def createStreamArrayWriter(shardId = nil) StreamWriter.new(@mOdpsConfig, @mProject, @mTable,getResource, shardId, @mOdpsTableSchema) end |
#createStreamWriter(shardId = nil) ⇒ Object
66 67 68 |
# File 'lib/fluent/plugin/stream_client.rb', line 66 def createStreamWriter(shardId = nil) StreamWriter.new(@mOdpsConfig, @mProject, @mTable,getResource, shardId) end |
#getOdpsTableSchema ⇒ Object
62 63 64 |
# File 'lib/fluent/plugin/stream_client.rb', line 62 def getOdpsTableSchema return @mOdpsTableSchema end |
#getPartitionList ⇒ Object
get partitions and return an array like :[“place”=>“china2”,“place”=>“china”]
52 53 54 |
# File 'lib/fluent/plugin/stream_client.rb', line 52 def getPartitionList @mOdpsTable.getPartitionList end |
#getShardStatus ⇒ Object
return json like [“0”,“State”: “loaded”,“1”,“State”: “loaded”]
75 76 77 78 79 80 81 82 83 84 85 86 87 88 |
# File 'lib/fluent/plugin/stream_client.rb', line 75 def getShardStatus header = Hash.new param = Hash.new param[$PARAM_CURR_PROJECT] = @mProject param[$PARAM_SHARD_STATUS] = "" conn = HttpConnection.new(@mOdpsConfig, header, param, getResource + "/shards", "GET") res = conn.getResponse json_obj = JSON.parse(res.body) if res.code != "200" raise OdpsDatahubException.new(json_obj["Code"], "getShardStatus failed because " + json_obj["Message"]) end return json_obj["ShardStatus"] end |
#loadShard(idx) ⇒ Object
108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 |
# File 'lib/fluent/plugin/stream_client.rb', line 108 def loadShard(idx) if idx < 0 raise OdpsDatahubException.new($INVALID_ARGUMENT, "loadShard num invalid") end header = Hash.new param = Hash.new param[$PARAM_CURR_PROJECT] = @mProject param[$PARAM_SHARD_NUMBER] = idx conn = HttpConnection.new(@mOdpsConfig, header, param, getResource + "/shards", "POST") res = conn.getResponse if res.code != "200" json_obj = JSON.parse(res.body) raise OdpsDatahubException.new(json_obj["Code"], "loadShard failed because " + json_obj["Message"]) end end |
#waitForShardLoad(timeOut = 120) ⇒ Object
seconds
90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 |
# File 'lib/fluent/plugin/stream_client.rb', line 90 def waitForShardLoad(timeOut = 120) #seconds if timeOut < 0 raise OdpsDatahubException.new($INVALID_ARGUMENT, "waitForShardLoad param invalid") end allLoaded = false loadtime=0 while loadtime < timeOut do if isShardLoadCompleted return end sleep(5) loadtime+=5 end if !isShardLoadCompleted raise OdpsDatahubException.new($LOADSHARD_TIMEOUT, "waitForShardLoad timeOut") end end |