Class: FluentPluginFirehose::FirehoseBufferedOutput
- Inherits:
-
Fluent::BufferedOutput
- Object
- Fluent::BufferedOutput
- FluentPluginFirehose::FirehoseBufferedOutput
- Includes:
- Fluent::DetachMultiProcessMixin, Fluent::SetTagKeyMixin, Fluent::SetTimeKeyMixin
- Defined in:
- lib/fluent/plugin/out_firehose.rb
Constant Summary collapse
- USER_AGENT_NAME =
'fluent-plugin-firehose-output'- PUT_RECORD_MAX_DATA_SIZE =
1000 KB
1000 * 1024
- PUT_RECORD_BATCH_MAX_COUNT =
500- PUT_RECORD_BATCH_MAX_DATA_SIZE =
4 MB
4 * 1024 * 1024
Instance Method Summary collapse
- #build_data_to_put(data) ⇒ Object
- #build_records_array_to_put(data_list) ⇒ Object
- #calculate_sleep_duration(current_retry) ⇒ Object
- #check_connection_to_stream ⇒ Object
- #configure(conf) ⇒ Object
- #format(tag, time, record) ⇒ Object
- #load_client ⇒ Object
- #put_record_batch_with_retry(records, retry_count = 0) ⇒ Object
- #put_record_for_order_events(data_list) ⇒ Object
- #record_exceeds_max_size?(record_string) ⇒ Boolean
- #scaling_factor ⇒ Object
- #start ⇒ Object
- #write(chunk) ⇒ Object
Instance Method Details
#build_data_to_put(data) ⇒ Object
178 179 180 181 182 183 184 |
# File 'lib/fluent/plugin/out_firehose.rb', line 178 def build_data_to_put(data) if @zlib_compression Hash[data.map{|k, v| [k.to_sym, k=="data" ? Zlib::Deflate.deflate(v) : v] }] else Hash[data.map{|k, v| [k.to_sym, v] }] end end |
#build_records_array_to_put(data_list) ⇒ Object
202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 |
# File 'lib/fluent/plugin/out_firehose.rb', line 202 def build_records_array_to_put(data_list) records_array = [] records = [] records_payload_length = 0 data_list.each{|data_to_put| payload = data_to_put[:data] if records.length >= PUT_RECORD_BATCH_MAX_COUNT or (records_payload_length + payload.length) >= PUT_RECORD_BATCH_MAX_DATA_SIZE records_array.push(records) records = [] records_payload_length = 0 end records.push(data_to_put) records_payload_length += payload.length } records_array.push(records) unless records.empty? records_array end |
#calculate_sleep_duration(current_retry) ⇒ Object
260 261 262 |
# File 'lib/fluent/plugin/out_firehose.rb', line 260 def calculate_sleep_duration(current_retry) Array.new(@retries_on_putrecordbatch){|n| ((2 ** n) * scaling_factor)}[current_retry] end |
#check_connection_to_stream ⇒ Object
174 175 176 |
# File 'lib/fluent/plugin/out_firehose.rb', line 174 def check_connection_to_stream @client.describe_delivery_stream(delivery_stream_name: @delivery_stream_name) end |
#configure(conf) ⇒ Object
63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 |
# File 'lib/fluent/plugin/out_firehose.rb', line 63 def configure(conf) super if @detach_process or (@num_threads > 1) @parallel_mode = true if @detach_process @use_detach_multi_process_mixin = true end else @parallel_mode = false end if @parallel_mode if @order_events log.warn 'You have set "order_events" to true, however this configuration will be ignored due to "detach_process" and/or "num_threads".' end @order_events = false end @dump_class = @use_yajl ? Yajl : MultiJson end |
#format(tag, time, record) ⇒ Object
95 96 97 98 99 100 101 |
# File 'lib/fluent/plugin/out_firehose.rb', line 95 def format(tag, time, record) data = { data: @dump_class.dump(record) + if @append_newline then "\n" else "" end } data.to_msgpack end |
#load_client ⇒ Object
125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 |
# File 'lib/fluent/plugin/out_firehose.rb', line 125 def load_client user_agent_suffix = "#{USER_AGENT_NAME}/#{FluentPluginFirehose::VERSION}" = { user_agent_suffix: user_agent_suffix } if @region [:region] = @region end if @aws_key_id && @aws_sec_key .update( access_key_id: @aws_key_id, secret_access_key: @aws_sec_key, ) elsif @profile credentials_opts = {:profile_name => @profile} credentials_opts[:path] = @credentials_path if @credentials_path credentials = Aws::SharedCredentials.new(credentials_opts) [:credentials] = credentials elsif @role_arn credentials = Aws::AssumeRoleCredentials.new( client: Aws::STS::Client.new(), role_arn: @role_arn, role_session_name: "aws-fluent-plugin-firehose", external_id: @external_id, duration_seconds: 60 * 60 ) [:credentials] = credentials end if @debug .update( logger: Logger.new(log.out), log_level: :debug ) # XXX: Add the following options, if necessary # :http_wire_trace => true end if @http_proxy [:http_proxy] = @http_proxy end @client = Aws::Firehose::Client.new() end |
#put_record_batch_with_retry(records, retry_count = 0) ⇒ Object
220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 |
# File 'lib/fluent/plugin/out_firehose.rb', line 220 def put_record_batch_with_retry(records, retry_count=0) response = @client.put_record_batch( records: records, delivery_stream_name: @delivery_stream_name ) log.info sprintf('Put record batch, Sent %d records and failed %d records', records.length, response[:failed_put_count].nil? ? 0 : response[:failed_put_count] ) if response[:failed_put_count] && response[:failed_put_count] > 0 failed_records = [] response[:request_responses].each_with_index{|record,index| if record[:error_code] failed_records.push({error_code: record[:error_code], error_message: record[:error_message], body: records[index]}) end } if(retry_count < @retries_on_putrecordbatch) failed_records.each{|record| log.info sprintf('Put record attempt failed, error_code: %s, error_message: %s, record: %s', record[:error_code], record[:error_message], @dump_class.dump(record[:body])) } sleep(calculate_sleep_duration(retry_count)) retry_count += 1 log.warn sprintf('Retrying to put %d records, Retry count: %d', failed_records.length, retry_count) put_record_batch_with_retry( failed_records.map{|record| record[:body]}, retry_count ) else failed_records.each{|record| log.error sprintf('Could not put record, error_code: %s, error_message: %s, record: %s', record[:error_code], record[:error_message], @dump_class.dump(record[:body])) } end end end |
#put_record_for_order_events(data_list) ⇒ Object
186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 |
# File 'lib/fluent/plugin/out_firehose.rb', line 186 def put_record_for_order_events(data_list) sequence_number_for_ordering = nil data_list.each do |data_to_put| if sequence_number_for_ordering data_to_put.update( sequence_number_for_ordering: sequence_number_for_ordering ) end data_to_put.update( delivery_stream_name: @delivery_stream_name ) result = @client.put_record(data_to_put) sequence_number_for_ordering = result[:sequence_number] end end |
#record_exceeds_max_size?(record_string) ⇒ Boolean
268 269 270 |
# File 'lib/fluent/plugin/out_firehose.rb', line 268 def record_exceeds_max_size?(record_string) return record_string.length > PUT_RECORD_MAX_DATA_SIZE end |
#scaling_factor ⇒ Object
264 265 266 |
# File 'lib/fluent/plugin/out_firehose.rb', line 264 def scaling_factor 0.5 + Kernel.rand * 0.1 end |
#start ⇒ Object
85 86 87 88 89 90 91 92 93 |
# File 'lib/fluent/plugin/out_firehose.rb', line 85 def start detach_multi_process do super load_client if @ensure_stream_connection check_connection_to_stream end end end |
#write(chunk) ⇒ Object
103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 |
# File 'lib/fluent/plugin/out_firehose.rb', line 103 def write(chunk) data_list = chunk.to_enum(:msgpack_each).map{|record| build_data_to_put(record) }.find_all{|record| unless record_exceeds_max_size?(record[:data]) true else log.error sprintf('Record exceeds the %.3f KB(s) per-record size limit and will not be delivered: %s', PUT_RECORD_MAX_DATA_SIZE / 1024.0, record[:data]) false end } if @order_events put_record_for_order_events(data_list) else records_array = build_records_array_to_put(data_list) records_array.each{|records| put_record_batch_with_retry(records) } end end |