Class: OdpsDatahub::StreamClient

Inherits:
Object
  • Object
show all
Defined in:
lib/fluent/plugin/stream_client.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(odpsConfig, project, table) ⇒ StreamClient

Returns a new instance of StreamClient.



30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
# File 'lib/fluent/plugin/stream_client.rb', line 30

def initialize(odpsConfig, project, table)
  @mOdpsConfig = odpsConfig
  @mProject = project
  @mTable = table
  @mShards = Array.new
  if @mProject == nil or @mProject == ""
    @mProject = @mOdpsConfig.defaultProjectName
  end
  @mOdpsTable = OdpsTable.new(@mOdpsConfig, @mProject, @mTable)
  header = Hash.new
  param = Hash.new
  param[$PARAM_QUERY] = "meta"
  conn = HttpConnection.new(@mOdpsConfig, header, param, getResource, "GET")
  res = conn.getResponse
  jsonTableMeta = JSON.parse(res.body)
  if res.code != "200"
    raise OdpsDatahubException.new(jsonTableMeta["Code"], "initialize failed because " + jsonTableMeta["Message"])
  end
  @mOdpsTableSchema = OdpsTableSchema.new(jsonTableMeta["Schema"])
end

Instance Attribute Details

#mOdpsConfigObject (readonly)

Returns the value of attribute mOdpsConfig.



29
30
31
# File 'lib/fluent/plugin/stream_client.rb', line 29

def mOdpsConfig
  @mOdpsConfig
end

#mOdpsTableObject (readonly)

Returns the value of attribute mOdpsTable.



29
30
31
# File 'lib/fluent/plugin/stream_client.rb', line 29

def mOdpsTable
  @mOdpsTable
end

#mOdpsTableSchemaObject (readonly)

Returns the value of attribute mOdpsTableSchema.



29
30
31
# File 'lib/fluent/plugin/stream_client.rb', line 29

def mOdpsTableSchema
  @mOdpsTableSchema
end

#mProjectObject (readonly)

Returns the value of attribute mProject.



29
30
31
# File 'lib/fluent/plugin/stream_client.rb', line 29

def mProject
  @mProject
end

#mTableObject (readonly)

Returns the value of attribute mTable.



29
30
31
# File 'lib/fluent/plugin/stream_client.rb', line 29

def mTable
  @mTable
end

Instance Method Details

#addPartition(ptStr) ⇒ Object

ptStr ex: ‘dt=20150805,hh=08,mm=24’ call add partiton if not exsits



58
59
60
# File 'lib/fluent/plugin/stream_client.rb', line 58

def addPartition(ptStr)
  @mOdpsTable.addPartition(ptStr)
end

#createStreamArrayWriter(shardId = nil) ⇒ Object



70
71
72
# File 'lib/fluent/plugin/stream_client.rb', line 70

def createStreamArrayWriter(shardId = nil)
  StreamWriter.new(@mOdpsConfig, @mProject, @mTable,getResource, shardId,  @mOdpsTableSchema)
end

#createStreamWriter(shardId = nil) ⇒ Object



66
67
68
# File 'lib/fluent/plugin/stream_client.rb', line 66

def createStreamWriter(shardId = nil)
  StreamWriter.new(@mOdpsConfig, @mProject, @mTable,getResource, shardId)
end

#getOdpsTableSchemaObject



62
63
64
# File 'lib/fluent/plugin/stream_client.rb', line 62

def getOdpsTableSchema
  return @mOdpsTableSchema
end

#getPartitionListObject

get partitions and return an array like :[“place”=>“china2”,“place”=>“china”]



52
53
54
# File 'lib/fluent/plugin/stream_client.rb', line 52

def getPartitionList
  @mOdpsTable.getPartitionList
end

#getShardStatusObject

return json like [“0”,“State”: “loaded”,“1”,“State”: “loaded”]



75
76
77
78
79
80
81
82
83
84
85
86
87
88
# File 'lib/fluent/plugin/stream_client.rb', line 75

def getShardStatus
  header = Hash.new
  param = Hash.new
  param[$PARAM_CURR_PROJECT] = @mProject
  param[$PARAM_SHARD_STATUS] = ""

  conn = HttpConnection.new(@mOdpsConfig, header, param, getResource + "/shards", "GET")
  res = conn.getResponse
  json_obj = JSON.parse(res.body)
  if res.code != "200"
    raise OdpsDatahubException.new(json_obj["Code"], "getShardStatus failed because " + json_obj["Message"])
  end
  return json_obj["ShardStatus"]
end

#loadShard(idx) ⇒ Object



108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
# File 'lib/fluent/plugin/stream_client.rb', line 108

def loadShard(idx)
  if idx < 0
    raise OdpsDatahubException.new($INVALID_ARGUMENT, "loadShard num invalid")
  end
  header = Hash.new
  param = Hash.new
  param[$PARAM_CURR_PROJECT] = @mProject
  param[$PARAM_SHARD_NUMBER] = idx
  conn = HttpConnection.new(@mOdpsConfig, header, param, getResource + "/shards", "POST")
  res = conn.getResponse
  if res.code != "200"
    json_obj = JSON.parse(res.body)
    raise OdpsDatahubException.new(json_obj["Code"], "loadShard failed because " + json_obj["Message"])
  end
end

#waitForShardLoad(timeOut = 120) ⇒ Object

seconds



90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
# File 'lib/fluent/plugin/stream_client.rb', line 90

def waitForShardLoad(timeOut = 120) #seconds
  if timeOut < 0
    raise OdpsDatahubException.new($INVALID_ARGUMENT, "waitForShardLoad param invalid")
  end
  allLoaded = false
  loadtime=0
  while loadtime < timeOut do
    if isShardLoadCompleted
      return
    end
    sleep(5)
    loadtime+=5
  end
  if !isShardLoadCompleted
    raise OdpsDatahubException.new($LOADSHARD_TIMEOUT, "waitForShardLoad timeOut")
  end
end