Class: Fluent::GoogleCloudStorageOut

Inherits:
TimeSlicedOutput
  • Object
show all
Extended by:
GoogleCloudStorageOut
Includes:
Mixin::ConfigPlaceholders
Defined in:
lib/fluent/plugin/out_google_cloud_storage_out.rb

Constant Summary collapse

Storage =
Google::Apis::StorageV1
ServiceAccountCredentials =
Google::Auth::ServiceAccountCredentials
SUPPORTED_COMPRESS =
{
  'gz' => :gz,
  'gzip' => :gzip,
}

Instance Method Summary collapse

Constructor Details

#initializeGoogleCloudStorageOut

Returns a new instance of GoogleCloudStorageOut.



54
55
56
57
58
59
60
# File 'lib/fluent/plugin/out_google_cloud_storage_out.rb', line 54

def initialize
  super
  require 'zlib'
  require 'net/http'
  require 'time'
  require 'mime-types'
end

Instance Method Details

#configure(conf) ⇒ Object



67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
# File 'lib/fluent/plugin/out_google_cloud_storage_out.rb', line 67

def configure(conf)
  if conf['path']
    if conf['path'].index('%S')
      conf['time_slice_format'] = '%Y%m%d%H%M%S'
    elsif conf['path'].index('%M')
      conf['time_slice_format'] = '%Y%m%d%H%M'
    elsif conf['path'].index('%H')
      conf['time_slice_format'] = '%Y%m%d%H'
    end
  end

  super

  @formatter = Plugin.new_formatter(@format)
  @formatter.configure(conf)
  prepare_client()
end

#format(tag, time, record) ⇒ Object



107
108
109
# File 'lib/fluent/plugin/out_google_cloud_storage_out.rb', line 107

def format(tag, time, record)
  @formatter.format(tag, time, record)
end

#path_format(chunk_key) ⇒ Object



111
112
113
114
115
# File 'lib/fluent/plugin/out_google_cloud_storage_out.rb', line 111

def path_format(chunk_key)
  path = Time.strptime(chunk_key, @time_slice_format).strftime(@path)
  log.debug "GCS Path: #{path}"
  path
end

#prepare_clientObject



85
86
87
88
89
90
91
92
93
94
95
96
97
# File 'lib/fluent/plugin/out_google_cloud_storage_out.rb', line 85

def prepare_client

  storage = Storage::StorageService.new
  scopes = [Storage::AUTH_CLOUD_PLATFORM, Storage::AUTH_DEVSTORAGE_FULL_CONTROL]
  storage.authorization = ServiceAccountCredentials.make_creds(
    {
      :json_key_io => File.open(),
      :scope => scopes
    }
  )

  @google_api_client = storage
end

#send(path, data) ⇒ Object



117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
# File 'lib/fluent/plugin/out_google_cloud_storage_out.rb', line 117

def send(path, data)
  mimetype = MIME::Types.type_for(path).first

  io = nil
  if SUPPORTED_COMPRESS.include?(@compress)
    io = StringIO.new("")
    writer = Zlib::GzipWriter.new(io)
    writer.write(data)
    writer.finish
    io.rewind
  else
    io = StringIO.new(data)
  end

  media = Google::APIClient::UploadIO.new(io, mimetype.content_type, File.basename(path))

  @gogle_api_client.insert_object(@bucket_id, upload_source: io, name: path, content_type:mimetype_content_type)

end

#shutdownObject



103
104
105
# File 'lib/fluent/plugin/out_google_cloud_storage_out.rb', line 103

def shutdown
  super
end

#startObject



99
100
101
# File 'lib/fluent/plugin/out_google_cloud_storage_out.rb', line 99

def start
  super
end

#write(chunk) ⇒ Object



137
138
139
140
141
142
143
# File 'lib/fluent/plugin/out_google_cloud_storage_out.rb', line 137

def write(chunk)
  gcs_path = path_format(chunk.key)

  send(gcs_path, chunk.read)

  gcs_path
end