Class: Fluent::Plugin::RedshiftOutputV2
- Inherits:
-
BufferedOutput
- Object
- BufferedOutput
- Fluent::Plugin::RedshiftOutputV2
- Defined in:
- lib/fluent/plugin/out_redshift_v2.rb
Instance Attribute Summary collapse
-
#last_gz_path ⇒ Object
readonly
Returns the value of attribute last_gz_path.
-
#last_sql ⇒ Object
readonly
Returns the value of attribute last_sql.
Instance Method Summary collapse
- #configure(conf) ⇒ Object
- #copy_sql(s3_uri) ⇒ Object
- #create_gz_file(chunk) ⇒ Object
- #exec_copy(s3_uri) ⇒ Object
- #format(_tag, _time, record) ⇒ Object
- #format_log(message) ⇒ Object
- #formatted_to_msgpack_binary ⇒ Object
-
#initialize ⇒ RedshiftOutputV2
constructor
A new instance of RedshiftOutputV2.
- #insert_logs(chunk) ⇒ Object
- #next_gz_path ⇒ Object
- #s3_uri(path) ⇒ Object
- #shutdown ⇒ Object
- #start ⇒ Object
- #try_write(chunk) ⇒ Object
- #write(chunk) ⇒ Object
Constructor Details
#initialize ⇒ RedshiftOutputV2
Returns a new instance of RedshiftOutputV2.
40 41 42 43 44 45 46 47 48 49 50 |
# File 'lib/fluent/plugin/out_redshift_v2.rb', line 40 def initialize super require 'aws-sdk' require 'zlib' require 'time' require 'tempfile' require 'pg' require 'json' require 'csv' end |
Instance Attribute Details
#last_gz_path ⇒ Object (readonly)
Returns the value of attribute last_gz_path.
4 5 6 |
# File 'lib/fluent/plugin/out_redshift_v2.rb', line 4 def last_gz_path @last_gz_path end |
#last_sql ⇒ Object (readonly)
Returns the value of attribute last_sql.
4 5 6 |
# File 'lib/fluent/plugin/out_redshift_v2.rb', line 4 def last_sql @last_sql end |
Instance Method Details
#configure(conf) ⇒ Object
52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 |
# File 'lib/fluent/plugin/out_redshift_v2.rb', line 52 def configure(conf) super if !check_credentials fail ConfigError, "aws_key_id and aws_sec_key is required. or, use aws_iam_role instead." end @path = "#{@path}/" unless @path.end_with?('/') @path = @path[1..-1] if @path.start_with?('/') @utc = true if conf['utc'] @db_conf = { host: @redshift_host, port: @redshift_port, dbname: @redshift_dbname, user: @redshift_user, password: @redshift_password, connect_timeout: @redshift_connect_timeout, hostaddr: IPSocket.getaddress(@redshift_host) } @delimiter = determine_delimiter(@file_type) if @delimiter.nil? or @delimiter.empty? $log.debug format_log("redshift file_type:#{@file_type} delimiter:'#{@delimiter}'") @table_name_with_schema = [@redshift_schemaname, @redshift_tablename].compact.join('.') @redshift_copy_columns = if @redshift_copy_columns.to_s.empty? nil else @redshift_copy_columns.split(/[,\s]+/) end @copy_sql_template = build_redshift_copy_sql_template @s3_server_side_encryption = @s3_server_side_encryption.to_sym if @s3_server_side_encryption end |
#copy_sql(s3_uri) ⇒ Object
185 186 187 |
# File 'lib/fluent/plugin/out_redshift_v2.rb', line 185 def copy_sql(s3_uri) @last_sql = @copy_sql_template % s3_uri end |
#create_gz_file(chunk) ⇒ Object
126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 |
# File 'lib/fluent/plugin/out_redshift_v2.rb', line 126 def create_gz_file(chunk) tmp = Tempfile.new("s3-") tmp = if json? || msgpack? create_gz_file_from_structured_data(tmp, chunk) else create_gz_file_from_flat_data(tmp, chunk) end if tmp key = next_gz_path @bucket.put_object({ server_side_encryption: @s3_server_side_encryption, body: tmp, key: key }) tmp.close! @last_gz_path = key else $log.debug format_log("received no valid data. ") return false end end |
#exec_copy(s3_uri) ⇒ Object
166 167 168 169 170 171 172 173 174 175 176 177 178 179 |
# File 'lib/fluent/plugin/out_redshift_v2.rb', line 166 def exec_copy(s3_uri) $log.debug format_log("start copying. s3_uri=#{s3_uri}") begin @redshift_connection.exec copy_sql(s3_uri) $log.info format_log("completed copying to redshift. s3_uri=#{s3_uri}") true rescue RedshiftError => e if e.to_s =~ /^ERROR: Load into table '[^']+' failed\./ $log.error format_log("failed to copy data into redshift due to load error. s3_uri=#{s3_uri}"), error:e.to_s return false end raise e end end |
#format(_tag, _time, record) ⇒ Object
102 103 104 105 106 107 108 109 110 |
# File 'lib/fluent/plugin/out_redshift_v2.rb', line 102 def format(_tag, _time, record) if json? record.to_msgpack elsif msgpack? { @record_log_tag => record }.to_msgpack else "#{record[@record_log_tag]}\n" end end |
#format_log(message) ⇒ Object
189 190 191 192 193 194 195 |
# File 'lib/fluent/plugin/out_redshift_v2.rb', line 189 def format_log() if @log_suffix && !@log_suffix.empty? "#{message} #{@log_suffix}" else end end |
#formatted_to_msgpack_binary ⇒ Object
197 198 199 |
# File 'lib/fluent/plugin/out_redshift_v2.rb', line 197 def formatted_to_msgpack_binary true end |
#insert_logs(chunk) ⇒ Object
121 122 123 124 |
# File 'lib/fluent/plugin/out_redshift_v2.rb', line 121 def insert_logs(chunk) $log.debug format_log("start creating gz.") exec_copy s3_uri(create_gz_file(chunk)) end |
#next_gz_path ⇒ Object
151 152 153 154 155 156 157 158 159 160 161 162 163 164 |
# File 'lib/fluent/plugin/out_redshift_v2.rb', line 151 def next_gz_path = (@utc) ? Time.now.utc.strftime() : Time.now.strftime() i = 0 path = '' loop do path = "#{@path}#{timestamp_key}_#{'%02d' % i}.gz" if @bucket.object(path).exists? i += 1 else break end end path end |
#s3_uri(path) ⇒ Object
181 182 183 |
# File 'lib/fluent/plugin/out_redshift_v2.rb', line 181 def s3_uri(path) "s3://#{@s3_bucket}/#{path}" end |
#shutdown ⇒ Object
99 100 |
# File 'lib/fluent/plugin/out_redshift_v2.rb', line 99 def shutdown end |
#start ⇒ Object
81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 |
# File 'lib/fluent/plugin/out_redshift_v2.rb', line 81 def start super = {} if @aws_key_id && @aws_sec_key = { access_key_id: @aws_key_id, secret_access_key: @aws_sec_key, force_path_style: true, region: @s3_region } end [:endpoint] = @s3_endpoint if @s3_endpoint @client = Aws::S3::Client.new() @bucket = Aws::S3::Bucket.new @s3_bucket, client: @client @redshift_connection = RedshiftConnection.new(@db_conf) end |
#try_write(chunk) ⇒ Object
116 117 118 119 |
# File 'lib/fluent/plugin/out_redshift_v2.rb', line 116 def try_write(chunk) insert_logs chunk commit_write chunk.unique_id end |
#write(chunk) ⇒ Object
112 113 114 |
# File 'lib/fluent/plugin/out_redshift_v2.rb', line 112 def write(chunk) insert_logs chunk end |