Class: Fluent::Plugin::GrasslandOutput
- Inherits:
-
Output
- Object
- Output
- Fluent::Plugin::GrasslandOutput
- Defined in:
- lib/fluent/plugin/out_grassland.rb
Instance Attribute Summary collapse
-
#access_key_id ⇒ Object
Returns the value of attribute access_key_id.
-
#id ⇒ Object
Returns the value of attribute id.
-
#kinesis ⇒ Object
Returns the value of attribute kinesis.
-
#partitionKeys ⇒ Object
Returns the value of attribute partitionKeys.
-
#random ⇒ Object
Returns the value of attribute random.
-
#region ⇒ Object
Returns the value of attribute region.
-
#secret_access_key ⇒ Object
Returns the value of attribute secret_access_key.
-
#sessionToken ⇒ Object
Returns the value of attribute sessionToken.
-
#stream_name ⇒ Object
Returns the value of attribute stream_name.
Instance Method Summary collapse
- #configure(conf) ⇒ Object
- #format(tag, time, record) ⇒ Object
- #get_json(location, limit = 3) ⇒ Object
-
#initialize ⇒ GrasslandOutput
constructor
A new instance of GrasslandOutput.
- #resetAwsCredential ⇒ Object
-
#set_interval(delay) ⇒ Object
config_param :resetCredentialTimer, :integer, :default => 20.
- #setCredential ⇒ Object
- #shutdown ⇒ Object
- #start ⇒ Object
- #write(chunk) ⇒ Object
Constructor Details
#initialize ⇒ GrasslandOutput
Returns a new instance of GrasslandOutput.
12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 |
# File 'lib/fluent/plugin/out_grassland.rb', line 12 def initialize super # require 'aws-sdk-v1' require 'aws-sdk' require 'base64' require 'json' require 'logger' require 'net/http' require 'uri' @random = Random.new log = Syslog::Logger.new 'grasslandplugin' log.info 'grassland initialize' # puts "grassland initialize" end |
Instance Attribute Details
#access_key_id ⇒ Object
Returns the value of attribute access_key_id.
10 11 12 |
# File 'lib/fluent/plugin/out_grassland.rb', line 10 def access_key_id @access_key_id end |
#id ⇒ Object
Returns the value of attribute id.
10 11 12 |
# File 'lib/fluent/plugin/out_grassland.rb', line 10 def id @id end |
#kinesis ⇒ Object
Returns the value of attribute kinesis.
9 10 11 |
# File 'lib/fluent/plugin/out_grassland.rb', line 9 def kinesis @kinesis end |
#partitionKeys ⇒ Object
Returns the value of attribute partitionKeys.
10 11 12 |
# File 'lib/fluent/plugin/out_grassland.rb', line 10 def partitionKeys @partitionKeys end |
#random ⇒ Object
Returns the value of attribute random.
8 9 10 |
# File 'lib/fluent/plugin/out_grassland.rb', line 8 def random @random end |
#region ⇒ Object
Returns the value of attribute region.
10 11 12 |
# File 'lib/fluent/plugin/out_grassland.rb', line 10 def region @region end |
#secret_access_key ⇒ Object
Returns the value of attribute secret_access_key.
10 11 12 |
# File 'lib/fluent/plugin/out_grassland.rb', line 10 def secret_access_key @secret_access_key end |
#sessionToken ⇒ Object
Returns the value of attribute sessionToken.
10 11 12 |
# File 'lib/fluent/plugin/out_grassland.rb', line 10 def sessionToken @sessionToken end |
#stream_name ⇒ Object
Returns the value of attribute stream_name.
10 11 12 |
# File 'lib/fluent/plugin/out_grassland.rb', line 10 def stream_name @stream_name end |
Instance Method Details
#configure(conf) ⇒ Object
44 45 46 47 48 49 50 51 52 |
# File 'lib/fluent/plugin/out_grassland.rb', line 44 def configure(conf) super [:key].each do |name| unless self.instance_variable_get("@#{name}") raise ConfigError, "'#{name}' is required" end end end |
#format(tag, time, record) ⇒ Object
119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 |
# File 'lib/fluent/plugin/out_grassland.rb', line 119 def format(tag, time, record) # print(record) ['dt', 'd'].each do |key| unless record.has_key?(key) log.info "input data error: '#{key}' is required" return "" end end unless record.has_key?('pt') record['pt'] = time end unless record.has_key?('cid') record['cid'] = @id end unless record.has_key?('uid') record['uid'] = '0' end record['pk'] = record['cid'] + record['dt'] return "#{record.to_json}," end |
#get_json(location, limit = 3) ⇒ Object
92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 |
# File 'lib/fluent/plugin/out_grassland.rb', line 92 def get_json(location, limit = 3) raise ArgumentError, 'too many HTTP redirects' if limit == 0 uri = URI.parse(location) begin response = Net::HTTP.start(uri.host, uri.port, use_ssl: uri.scheme == 'https') do |http| http.open_timeout = 5 http.read_timeout = 10 http.get(uri.request_uri) end case response when Net::HTTPSuccess json = response.body JSON.parse(json) when Net::HTTPRedirection location = response['location'] warn "redirected to #{location}" get_json(location, limit - 1) else log.info [uri.to_s, response.value].join(" : ") # handle error end rescue => e log.info [uri.to_s, e.class, e].join(" : ") # handle error end end |
#resetAwsCredential ⇒ Object
66 67 68 69 70 71 72 73 74 75 76 77 78 79 |
# File 'lib/fluent/plugin/out_grassland.rb', line 66 def resetAwsCredential() begin setCredential configure_aws @kinesis.put_record({ :stream_name => @stream_name, :data => "test", :partition_key => "#{random.rand(999)}" }) log.info "grassland: reset credential" rescue => e log.info [e.class, e].join(" : initialize error.") end end |
#set_interval(delay) ⇒ Object
config_param :resetCredentialTimer, :integer, :default => 20
35 36 37 38 39 40 41 42 |
# File 'lib/fluent/plugin/out_grassland.rb', line 35 def set_interval(delay) Thread.new do loop do sleep delay yield # call passed block end end end |
#setCredential ⇒ Object
81 82 83 84 85 86 87 88 89 90 |
# File 'lib/fluent/plugin/out_grassland.rb', line 81 def setCredential() credential = get_json("#{@apiuri}?key=#{@key}") @id = credential['id'] @stream_name = credential['streamName'] @access_key_id = credential['accessKeyId'] @secret_access_key = credential['secretAccessKey'] @region = credential['region'] @sessionToken = credential['SessionToken'] @partitionKeys = credential['partitionKeyList'] end |
#shutdown ⇒ Object
62 63 64 |
# File 'lib/fluent/plugin/out_grassland.rb', line 62 def shutdown super end |
#start ⇒ Object
54 55 56 57 58 59 60 |
# File 'lib/fluent/plugin/out_grassland.rb', line 54 def start super set_interval(@resetCredentialTimer){ resetAwsCredential } resetAwsCredential end |
#write(chunk) ⇒ Object
141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 |
# File 'lib/fluent/plugin/out_grassland.rb', line 141 def write(chunk) buf = chunk.read dataList = JSON.parse("[#{buf.chop}]") putBuf = "" bufList = {} begin dataList.each do |data| # debug log # log.info data.to_json if bufList[":#{data['pk']}"] == nil then bufList[":#{data['pk']}"] = "#{data.to_json}," else bufList[":#{data['pk']}"] += "#{data.to_json}," end if bufList[":#{data['pk']}"].bytesize >= 30720 then @kinesis.put_record({ :stream_name => @stream_name, :data => "["+bufList[":#{data['pk']}"].chop+"]", :partition_key => partitionKeys[random.rand(partitionKeys.length)] # :partition_key => data['pk'] }) bufList.delete(":#{data['pk']}") end end dataList.each do |data| if bufList[":#{data['pk']}"] != nil then @kinesis.put_record({ :stream_name => @stream_name, :data => "["+bufList[":#{data['pk']}"].chop+"]", :partition_key => partitionKeys[random.rand(partitionKeys.length)] # :partition_key => data['pk'] }) bufList.delete(":#{data['pk']}") end end rescue log.info "error: put_record to grassland. maybe too many requests. few data dropped." end end |