Class: Fluent::Plugin::GCSOutput

Inherits:
Output
  • Object
show all
Defined in:
lib/fluent/plugin/out_gcs.rb

Constant Summary collapse

DEFAULT_FORMAT_TYPE =
"out_file"
MAX_HEX_RANDOM_LENGTH =
32

Instance Method Summary collapse

Constructor Details

#initializeGCSOutput

Returns a new instance of GCSOutput.



15
16
17
18
19
# File 'lib/fluent/plugin/out_gcs.rb', line 15

def initialize
  super
  require "google/cloud/storage"
  Google::Apis.logger = log
end

Instance Method Details

#configure(conf) ⇒ Object



73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
# File 'lib/fluent/plugin/out_gcs.rb', line 73

def configure(conf)
  compat_parameters_convert(conf, :buffer, :formatter, :inject)
  super

  if @hex_random_length > MAX_HEX_RANDOM_LENGTH
    raise Fluent::ConfigError, "hex_random_length parameter should be set to #{MAX_HEX_RANDOM_LENGTH} characters or less."
  end

  # The customer-supplied, AES-256 encryption key that will be used to encrypt the file.
  @encryption_opts = {
    encryption_key: @encryption_key,
  }

  if 
     = .map {|m| [m.key, m.value] }.to_h
  end

  @formatter = formatter_create

  @object_creator = Fluent::GCS.discovered_object_creator(@store_as, transcoding: @transcoding)
  # For backward compatibility
  # TODO: Remove time_slice_format when end of support compat_parameters
  @configured_time_slice_format = conf['time_slice_format']
  @time_slice_with_tz = Fluent::Timezone.formatter(@timekey_zone, @configured_time_slice_format || timekey_to_timeformat(@buffer_config['timekey']))
end

#format(tag, time, record) ⇒ Object



112
113
114
115
# File 'lib/fluent/plugin/out_gcs.rb', line 112

def format(tag, time, record)
  r = inject_values_to_record(tag, time, record)
  @formatter.format(tag, time, r)
end

#multi_workers_ready?Boolean

Returns:

  • (Boolean)


117
118
119
# File 'lib/fluent/plugin/out_gcs.rb', line 117

def multi_workers_ready?
  true
end

#startObject



99
100
101
102
103
104
105
106
107
108
109
110
# File 'lib/fluent/plugin/out_gcs.rb', line 99

def start
  @gcs = Google::Cloud::Storage.new(
    project: @project,
    keyfile: @keyfile,
    retries: @client_retries,
    timeout: @client_timeout
  )
  @gcs_bucket = @gcs.bucket(@bucket)

  ensure_bucket
  super
end

#write(chunk) ⇒ Object



121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
# File 'lib/fluent/plugin/out_gcs.rb', line 121

def write(chunk)
  path = generate_path(chunk)

  @object_creator.create(chunk) do |obj|
    opts = {
      metadata: ,
      acl: @acl,
      storage_class: @storage_class,
      content_type: @object_creator.content_type,
      content_encoding: @object_creator.content_encoding,
    }
    opts.merge!(@encryption_opts)

    log.debug { "out_gcs: upload chunk:#{chunk.key} to gcs://#{@bucket}/#{path} options: #{opts}" }
    @gcs_bucket.upload_file(obj.path, path, **opts)
  end
end