Class: Fluent::GoogleCloudStorageOut
- Inherits:
-
TimeSlicedOutput
- Object
- TimeSlicedOutput
- Fluent::GoogleCloudStorageOut
- Extended by:
- GoogleCloudStorageOut
- Includes:
- Mixin::ConfigPlaceholders
- Defined in:
- lib/fluent/plugin/out_google_cloud_storage_out.rb
Constant Summary collapse
- Storage =
Google::Apis::StorageV1
- ServiceAccountCredentials =
Google::Auth::ServiceAccountCredentials
- SUPPORTED_COMPRESS =
{ 'gz' => :gz, 'gzip' => :gzip, }
Instance Method Summary collapse
- #configure(conf) ⇒ Object
- #format(tag, time, record) ⇒ Object
-
#initialize ⇒ GoogleCloudStorageOut
constructor
A new instance of GoogleCloudStorageOut.
- #path_format(chunk_key) ⇒ Object
- #prepare_client ⇒ Object
- #send(path, data) ⇒ Object
- #shutdown ⇒ Object
- #start ⇒ Object
- #write(chunk) ⇒ Object
Constructor Details
#initialize ⇒ GoogleCloudStorageOut
Returns a new instance of GoogleCloudStorageOut.
54 55 56 57 58 59 60 |
# File 'lib/fluent/plugin/out_google_cloud_storage_out.rb', line 54 def initialize super require 'zlib' require 'net/http' require 'time' require 'mime-types' end |
Instance Method Details
#configure(conf) ⇒ Object
67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 |
# File 'lib/fluent/plugin/out_google_cloud_storage_out.rb', line 67 def configure(conf) if conf['path'] if conf['path'].index('%S') conf['time_slice_format'] = '%Y%m%d%H%M%S' elsif conf['path'].index('%M') conf['time_slice_format'] = '%Y%m%d%H%M' elsif conf['path'].index('%H') conf['time_slice_format'] = '%Y%m%d%H' end end super @formatter = Plugin.new_formatter(@format) @formatter.configure(conf) prepare_client() end |
#format(tag, time, record) ⇒ Object
107 108 109 |
# File 'lib/fluent/plugin/out_google_cloud_storage_out.rb', line 107 def format(tag, time, record) @formatter.format(tag, time, record) end |
#path_format(chunk_key) ⇒ Object
111 112 113 114 115 |
# File 'lib/fluent/plugin/out_google_cloud_storage_out.rb', line 111 def path_format(chunk_key) path = Time.strptime(chunk_key, @time_slice_format).strftime(@path) log.debug "GCS Path: #{path}" path end |
#prepare_client ⇒ Object
85 86 87 88 89 90 91 92 93 94 95 96 97 |
# File 'lib/fluent/plugin/out_google_cloud_storage_out.rb', line 85 def prepare_client storage = Storage::StorageService.new scopes = [Storage::AUTH_CLOUD_PLATFORM, Storage::AUTH_DEVSTORAGE_FULL_CONTROL] storage. = ServiceAccountCredentials.make_creds( { :json_key_io => File.open(@service_account_json_key), :scope => scopes } ) @google_api_client = storage end |
#send(path, data) ⇒ Object
117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 |
# File 'lib/fluent/plugin/out_google_cloud_storage_out.rb', line 117 def send(path, data) mimetype = MIME::Types.type_for(path).first io = nil if SUPPORTED_COMPRESS.include?(@compress) io = StringIO.new("") writer = Zlib::GzipWriter.new(io) writer.write(data) writer.finish io.rewind else io = StringIO.new(data) end media = Google::APIClient::UploadIO.new(io, mimetype.content_type, File.basename(path)) @gogle_api_client.insert_object(@bucket_id, upload_source: io, name: path, content_type:mimetype_content_type) end |
#shutdown ⇒ Object
103 104 105 |
# File 'lib/fluent/plugin/out_google_cloud_storage_out.rb', line 103 def shutdown super end |
#start ⇒ Object
99 100 101 |
# File 'lib/fluent/plugin/out_google_cloud_storage_out.rb', line 99 def start super end |
#write(chunk) ⇒ Object
137 138 139 140 141 142 143 |
# File 'lib/fluent/plugin/out_google_cloud_storage_out.rb', line 137 def write(chunk) gcs_path = path_format(chunk.key) send(gcs_path, chunk.read) gcs_path end |