Class: FluentPluginKinesis::OutputFilter
- Inherits:
-
Fluent::BufferedOutput
- Object
- Fluent::BufferedOutput
- FluentPluginKinesis::OutputFilter
- Includes:
- Fluent::DetachMultiProcessMixin, Fluent::SetTagKeyMixin, Fluent::SetTimeKeyMixin
- Defined in:
- lib/fluent/plugin/out_kinesis.rb
Constant Summary collapse
- USER_AGENT_NAME =
'fluent-plugin-kinesis-output-filter'
- PROC_BASE_STR =
'proc {|record| %s }'
- PUT_RECORDS_MAX_COUNT =
500
- PUT_RECORD_MAX_DATA_SIZE =
1024 * 1024
- PUT_RECORDS_MAX_DATA_SIZE =
1024 * 1024 * 5
Instance Method Summary collapse
- #configure(conf) ⇒ Object
- #format(tag, time, record) ⇒ Object
- #start ⇒ Object
- #write(chunk) ⇒ Object
Instance Method Details
#configure(conf) ⇒ Object
73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 |
# File 'lib/fluent/plugin/out_kinesis.rb', line 73 def configure(conf) log.warn("Deprecated warning: out_kinesis is no longer supported after v1.0.0. Please check out_kinesis_streams out.") super validate_params if @detach_process or (@num_threads > 1) @parallel_mode = true if @detach_process @use_detach_multi_process_mixin = true end else @parallel_mode = false end if @parallel_mode if @order_events log.warn 'You have set "order_events" to true, however this configuration will be ignored due to "detach_process" and/or "num_threads".' end @order_events = false end if @partition_key_expr partition_key_proc_str = sprintf( PROC_BASE_STR, @partition_key_expr ) @partition_key_proc = eval(partition_key_proc_str) end if @explicit_hash_key_expr explicit_hash_key_proc_str = sprintf( PROC_BASE_STR, @explicit_hash_key_expr ) @explicit_hash_key_proc = eval(explicit_hash_key_proc_str) end @dump_class = @use_yajl ? Yajl : JSON end |
#format(tag, time, record) ⇒ Object
121 122 123 124 125 126 127 128 129 130 131 132 |
# File 'lib/fluent/plugin/out_kinesis.rb', line 121 def format(tag, time, record) data = { data: @dump_class.dump(record), partition_key: get_key(:partition_key,record) } if @explicit_hash_key or @explicit_hash_key_proc data[:explicit_hash_key] = get_key(:explicit_hash_key,record) end data.to_msgpack end |
#start ⇒ Object
111 112 113 114 115 116 117 118 119 |
# File 'lib/fluent/plugin/out_kinesis.rb', line 111 def start detach_multi_process do super load_client if @ensure_stream_connection check_connection_to_stream end end end |
#write(chunk) ⇒ Object
134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 |
# File 'lib/fluent/plugin/out_kinesis.rb', line 134 def write(chunk) data_list = chunk.to_enum(:msgpack_each).map{|record| build_data_to_put(record) }.find_all{|record| unless record_exceeds_max_size?(record[:data]) true else log.error sprintf('Record exceeds the %.3f KB(s) per-record size limit and will not be delivered: %s', PUT_RECORD_MAX_DATA_SIZE / 1024.0, record[:data]) false end } if @order_events put_record_for_order_events(data_list) else records_array = build_records_array_to_put(data_list) records_array.each{|records| put_records_with_retry(records) } end end |