Class: Fluent::Plugin::RedshiftOutputV2

Inherits:
BufferedOutput
  • Object
show all
Defined in:
lib/fluent/plugin/out_redshift_v2.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initializeRedshiftOutputV2

Returns a new instance of RedshiftOutputV2.



40
41
42
43
44
45
46
47
48
49
50
# File 'lib/fluent/plugin/out_redshift_v2.rb', line 40

def initialize
  super

  require 'aws-sdk'
  require 'zlib'
  require 'time'
  require 'tempfile'
  require 'pg'
  require 'json'
  require 'csv'
end

Instance Attribute Details

#last_gz_pathObject (readonly)

Returns the value of attribute last_gz_path.



4
5
6
# File 'lib/fluent/plugin/out_redshift_v2.rb', line 4

def last_gz_path
  @last_gz_path
end

#last_sqlObject (readonly)

Returns the value of attribute last_sql.



4
5
6
# File 'lib/fluent/plugin/out_redshift_v2.rb', line 4

def last_sql
  @last_sql
end

Instance Method Details

#configure(conf) ⇒ Object



52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
# File 'lib/fluent/plugin/out_redshift_v2.rb', line 52

def configure(conf)
  super
  if !check_credentials
    fail ConfigError, "aws_key_id and aws_sec_key is required. or, use aws_iam_role instead."
  end
  @path = "#{@path}/" unless @path.end_with?('/')
  @path = @path[1..-1] if @path.start_with?('/')
  @utc = true if conf['utc']
  @db_conf = {
    host: @redshift_host,
    port: @redshift_port,
    dbname: @redshift_dbname,
    user: @redshift_user,
    password: @redshift_password,
    connect_timeout: @redshift_connect_timeout,
    hostaddr: IPSocket.getaddress(@redshift_host)
  }
  @delimiter = determine_delimiter(@file_type) if @delimiter.nil? or @delimiter.empty?
  $log.debug format_log("redshift file_type:#{@file_type} delimiter:'#{@delimiter}'")
  @table_name_with_schema = [@redshift_schemaname, @redshift_tablename].compact.join('.')
  @redshift_copy_columns = if @redshift_copy_columns.to_s.empty?
                             nil
                           else
                             @redshift_copy_columns.split(/[,\s]+/)
                           end
  @copy_sql_template = build_redshift_copy_sql_template
  @s3_server_side_encryption = @s3_server_side_encryption.to_sym if @s3_server_side_encryption
end

#copy_sql(s3_uri) ⇒ Object



185
186
187
# File 'lib/fluent/plugin/out_redshift_v2.rb', line 185

def copy_sql(s3_uri)
  @last_sql = @copy_sql_template % s3_uri
end

#create_gz_file(chunk) ⇒ Object



126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
# File 'lib/fluent/plugin/out_redshift_v2.rb', line 126

def create_gz_file(chunk)
  tmp = Tempfile.new("s3-")
  tmp =
    if json? || msgpack?
      create_gz_file_from_structured_data(tmp, chunk)
    else
      create_gz_file_from_flat_data(tmp, chunk)
    end

  if tmp
    key = next_gz_path
    @bucket.put_object({
        server_side_encryption: @s3_server_side_encryption,
        body: tmp,
        key: key
      })

    tmp.close!
    @last_gz_path = key
  else
    $log.debug format_log("received no valid data. ")
    return false
  end
end

#exec_copy(s3_uri) ⇒ Object



166
167
168
169
170
171
172
173
174
175
176
177
178
179
# File 'lib/fluent/plugin/out_redshift_v2.rb', line 166

def exec_copy(s3_uri)
  $log.debug format_log("start copying. s3_uri=#{s3_uri}")
  begin
    @redshift_connection.exec copy_sql(s3_uri)
    $log.info format_log("completed copying to redshift. s3_uri=#{s3_uri}")
    true
  rescue RedshiftError => e
    if e.to_s =~ /^ERROR:  Load into table '[^']+' failed\./
      $log.error format_log("failed to copy data into redshift due to load error. s3_uri=#{s3_uri}"), error:e.to_s
      return false
    end
    raise e
  end
end

#format(_tag, _time, record) ⇒ Object



102
103
104
105
106
107
108
109
110
# File 'lib/fluent/plugin/out_redshift_v2.rb', line 102

def format(_tag, _time, record)
  if json?
    record.to_msgpack
  elsif msgpack?
    { @record_log_tag => record }.to_msgpack
  else
    "#{record[@record_log_tag]}\n"
  end
end

#format_log(message) ⇒ Object



189
190
191
192
193
194
195
# File 'lib/fluent/plugin/out_redshift_v2.rb', line 189

def format_log(message)
  if @log_suffix && !@log_suffix.empty?
    "#{message} #{@log_suffix}"
  else
    message
  end
end

#formatted_to_msgpack_binaryObject



197
198
199
# File 'lib/fluent/plugin/out_redshift_v2.rb', line 197

def formatted_to_msgpack_binary
  true
end

#insert_logs(chunk) ⇒ Object



121
122
123
124
# File 'lib/fluent/plugin/out_redshift_v2.rb', line 121

def insert_logs(chunk)
  $log.debug format_log("start creating gz.")
  exec_copy s3_uri(create_gz_file(chunk))
end

#next_gz_pathObject



151
152
153
154
155
156
157
158
159
160
161
162
163
164
# File 'lib/fluent/plugin/out_redshift_v2.rb', line 151

def next_gz_path
  timestamp_key = (@utc) ? Time.now.utc.strftime(@timestamp_key_format) : Time.now.strftime(@timestamp_key_format)
  i = 0
  path = ''
  loop do
    path = "#{@path}#{timestamp_key}_#{'%02d' % i}.gz"
    if @bucket.object(path).exists?
      i += 1
    else
      break
    end
  end
  path
end

#s3_uri(path) ⇒ Object



181
182
183
# File 'lib/fluent/plugin/out_redshift_v2.rb', line 181

def s3_uri(path)
  "s3://#{@s3_bucket}/#{path}"
end

#shutdownObject



99
100
# File 'lib/fluent/plugin/out_redshift_v2.rb', line 99

def shutdown
end

#startObject



81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
# File 'lib/fluent/plugin/out_redshift_v2.rb', line 81

def start
  super

  options = {}
  if @aws_key_id && @aws_sec_key
    options = {
      access_key_id: @aws_key_id,
      secret_access_key: @aws_sec_key,
      force_path_style: true,
      region: @s3_region
    }
  end
  options[:endpoint] = @s3_endpoint if @s3_endpoint
  @client = Aws::S3::Client.new(options)
  @bucket = Aws::S3::Bucket.new @s3_bucket, client: @client
  @redshift_connection = RedshiftConnection.new(@db_conf)
end

#try_write(chunk) ⇒ Object



116
117
118
119
# File 'lib/fluent/plugin/out_redshift_v2.rb', line 116

def try_write(chunk)
  insert_logs chunk
  commit_write chunk.unique_id
end

#write(chunk) ⇒ Object



112
113
114
# File 'lib/fluent/plugin/out_redshift_v2.rb', line 112

def write(chunk)
  insert_logs chunk
end