Class: Fluent::Plugin::GrasslandOutput

Inherits:
Output
  • Object
show all
Defined in:
lib/fluent/plugin/out_grassland.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initializeGrasslandOutput

Returns a new instance of GrasslandOutput.



12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
# File 'lib/fluent/plugin/out_grassland.rb', line 12

def initialize
  super
  # require 'aws-sdk-v1'
  require 'aws-sdk'
  require 'base64'
  require 'json'
  require 'logger'
  require 'net/http'
  require 'uri'
  @random = Random.new

  log = Syslog::Logger.new 'grasslandplugin'
  log.info 'grassland initialize'
  # puts "grassland initialize"
end

Instance Attribute Details

#access_key_idObject

Returns the value of attribute access_key_id.



10
11
12
# File 'lib/fluent/plugin/out_grassland.rb', line 10

def access_key_id
  @access_key_id
end

#idObject

Returns the value of attribute id.



10
11
12
# File 'lib/fluent/plugin/out_grassland.rb', line 10

def id
  @id
end

#kinesisObject

Returns the value of attribute kinesis.



9
10
11
# File 'lib/fluent/plugin/out_grassland.rb', line 9

def kinesis
  @kinesis
end

#partitionKeysObject

Returns the value of attribute partitionKeys.



10
11
12
# File 'lib/fluent/plugin/out_grassland.rb', line 10

def partitionKeys
  @partitionKeys
end

#randomObject

Returns the value of attribute random.



8
9
10
# File 'lib/fluent/plugin/out_grassland.rb', line 8

def random
  @random
end

#regionObject

Returns the value of attribute region.



10
11
12
# File 'lib/fluent/plugin/out_grassland.rb', line 10

def region
  @region
end

#secret_access_keyObject

Returns the value of attribute secret_access_key.



10
11
12
# File 'lib/fluent/plugin/out_grassland.rb', line 10

def secret_access_key
  @secret_access_key
end

#sessionTokenObject

Returns the value of attribute sessionToken.



10
11
12
# File 'lib/fluent/plugin/out_grassland.rb', line 10

def sessionToken
  @sessionToken
end

#stream_nameObject

Returns the value of attribute stream_name.



10
11
12
# File 'lib/fluent/plugin/out_grassland.rb', line 10

def stream_name
  @stream_name
end

Instance Method Details

#configure(conf) ⇒ Object



44
45
46
47
48
49
50
51
52
# File 'lib/fluent/plugin/out_grassland.rb', line 44

def configure(conf)
  super

  [:key].each do |name|
    unless self.instance_variable_get("@#{name}")
      raise ConfigError, "'#{name}' is required"
    end
  end
end

#format(tag, time, record) ⇒ Object



119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
# File 'lib/fluent/plugin/out_grassland.rb', line 119

def format(tag, time, record)
  # print(record)
  ['dt', 'd'].each do |key|
    unless record.has_key?(key)
      log.info "input data error: '#{key}' is required"
      return ""
    end
  end
  unless record.has_key?('pt')
    record['pt'] = time
  end
  unless record.has_key?('cid')
    record['cid'] = @id
  end
  unless record.has_key?('uid')
    record['uid'] = '0'
  end

  record['pk'] = record['cid'] + record['dt']
  return "#{record.to_json},"
end

#get_json(location, limit = 3) ⇒ Object

Raises:

  • (ArgumentError)


92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
# File 'lib/fluent/plugin/out_grassland.rb', line 92

def get_json(location, limit = 3)
  raise ArgumentError, 'too many HTTP redirects' if limit == 0
  uri = URI.parse(location)
  begin
    response = Net::HTTP.start(uri.host, uri.port, use_ssl: uri.scheme == 'https') do |http|
      http.open_timeout = 5
      http.read_timeout = 10
      http.get(uri.request_uri)
    end
    case response
    when Net::HTTPSuccess
      json = response.body
      JSON.parse(json)
    when Net::HTTPRedirection
      location = response['location']
      warn "redirected to #{location}"
      get_json(location, limit - 1)
    else
      log.info [uri.to_s, response.value].join(" : ")
      # handle error
    end
  rescue => e
    log.info [uri.to_s, e.class, e].join(" : ")
    # handle error
  end
end

#resetAwsCredentialObject



66
67
68
69
70
71
72
73
74
75
76
77
78
79
# File 'lib/fluent/plugin/out_grassland.rb', line 66

def resetAwsCredential()
  begin
    setCredential
    configure_aws
    @kinesis.put_record({
      :stream_name   => @stream_name,
      :data          => "test",
      :partition_key => "#{random.rand(999)}"
    })
    log.info "grassland: reset credential"
  rescue => e
    log.info [e.class, e].join(" : initialize error.")
  end
end

#set_interval(delay) ⇒ Object

config_param :resetCredentialTimer, :integer, :default => 20



35
36
37
38
39
40
41
42
# File 'lib/fluent/plugin/out_grassland.rb', line 35

def set_interval(delay)
  Thread.new do
    loop do
      sleep delay
      yield # call passed block
    end
  end
end

#setCredentialObject



81
82
83
84
85
86
87
88
89
90
# File 'lib/fluent/plugin/out_grassland.rb', line 81

def setCredential()
  credential = get_json("#{@apiuri}?key=#{@key}")
  @id = credential['id']
  @stream_name = credential['streamName']
  @access_key_id = credential['accessKeyId']
  @secret_access_key = credential['secretAccessKey']
  @region = credential['region']
  @sessionToken = credential['SessionToken']
  @partitionKeys = credential['partitionKeyList']
end

#shutdownObject



62
63
64
# File 'lib/fluent/plugin/out_grassland.rb', line 62

def shutdown
  super
end

#startObject



54
55
56
57
58
59
60
# File 'lib/fluent/plugin/out_grassland.rb', line 54

def start
  super
  set_interval(@resetCredentialTimer){
    resetAwsCredential
  }
  resetAwsCredential
end

#write(chunk) ⇒ Object



141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
# File 'lib/fluent/plugin/out_grassland.rb', line 141

def write(chunk)
  buf = chunk.read
  dataList = JSON.parse("[#{buf.chop}]")
  putBuf = ""
  bufList = {}

  begin
    dataList.each do |data|
      # debug log
      # log.info data.to_json
      if bufList[":#{data['pk']}"] == nil then
        bufList[":#{data['pk']}"] = "#{data.to_json},"
      else
        bufList[":#{data['pk']}"] += "#{data.to_json},"
      end
      if bufList[":#{data['pk']}"].bytesize >= 30720 then
        @kinesis.put_record({
          :stream_name   => @stream_name,
          :data          => "["+bufList[":#{data['pk']}"].chop+"]",
          :partition_key => partitionKeys[random.rand(partitionKeys.length)]
          # :partition_key => data['pk']
        })
        bufList.delete(":#{data['pk']}")
      end
    end
    dataList.each do |data|
      if bufList[":#{data['pk']}"] != nil then
        @kinesis.put_record({
          :stream_name   => @stream_name,
          :data          => "["+bufList[":#{data['pk']}"].chop+"]",
          :partition_key => partitionKeys[random.rand(partitionKeys.length)]
          # :partition_key => data['pk']
        })
        bufList.delete(":#{data['pk']}")
      end
    end
  rescue
    log.info "error: put_record to grassland. maybe too many requests. few data dropped."
  end
end