Class: Fluent::GoogleCloudStorageOut
- Inherits:
-
TimeSlicedOutput
- Object
- TimeSlicedOutput
- Fluent::GoogleCloudStorageOut
- Includes:
- Mixin::ConfigPlaceholders
- Defined in:
- lib/fluent/plugin/out_google_cloud_storage_out.rb
Constant Summary collapse
- Storage =
Google::Apis::StorageV1
- ServiceAccountCredentials =
Google::Auth::ServiceAccountCredentials
- SUPPORTED_COMPRESS =
{ 'gz' => :gz, 'gzip' => :gz, }
- UNIQUE_STRATEGY =
{ 'chunk_id' => :chunk_id, 'increment' => :increment, 'timestamp' => :timestamp, }
- UNIQUE_PLACE_HOLDER =
'${unique}'
Instance Method Summary collapse
- #chunk_unique_id_to_str(unique_id) ⇒ Object
- #configure(conf) ⇒ Object
- #format(tag, time, record) ⇒ Object
-
#initialize ⇒ GoogleCloudStorageOut
constructor
A new instance of GoogleCloudStorageOut.
- #path_format(chunk) ⇒ Object
- #path_format_with_unique_strategy(path, strategy, chunk_key, chunk_unique) ⇒ Object
- #prepare_client ⇒ Object
- #send(path, data) ⇒ Object
- #shutdown ⇒ Object
- #start ⇒ Object
- #write(chunk) ⇒ Object
Constructor Details
#initialize ⇒ GoogleCloudStorageOut
Returns a new instance of GoogleCloudStorageOut.
72 73 74 75 76 77 78 |
# File 'lib/fluent/plugin/out_google_cloud_storage_out.rb', line 72 def initialize super require 'zlib' require 'net/http' require 'time' require 'mime-types' end |
Instance Method Details
#chunk_unique_id_to_str(unique_id) ⇒ Object
135 136 137 |
# File 'lib/fluent/plugin/out_google_cloud_storage_out.rb', line 135 def chunk_unique_id_to_str(unique_id) unique_id.unpack('C*').map{|x| x.to_s(16).rjust(2,'0')}.join('') end |
#configure(conf) ⇒ Object
85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 |
# File 'lib/fluent/plugin/out_google_cloud_storage_out.rb', line 85 def configure(conf) if conf['path'] if conf['path'].index('%S') conf['time_slice_format'] = '%Y%m%d%H%M%S' elsif conf['path'].index('%M') conf['time_slice_format'] = '%Y%m%d%H%M' elsif conf['path'].index('%H') conf['time_slice_format'] = '%Y%m%d%H' end end super if @path.index(UNIQUE_PLACE_HOLDER).nil? && @unique_strategy raise Fluent::ConfigError, "Path must contain ${unique}, or you set the unique_strategy to nil." end @formatter = Plugin.new_formatter(@format) @formatter.configure(conf) @path_suffix = ".log" prepare_client() if @unique_strategy == :increment @samepath_counter = 0 end end |
#format(tag, time, record) ⇒ Object
131 132 133 |
# File 'lib/fluent/plugin/out_google_cloud_storage_out.rb', line 131 def format(tag, time, record) @formatter.format(tag, time, record) end |
#path_format(chunk) ⇒ Object
160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 |
# File 'lib/fluent/plugin/out_google_cloud_storage_out.rb', line 160 def path_format(chunk) # format from chunk key path = Time.strptime(chunk.key, @time_slice_format).strftime(@path) # format for make unique path = path_format_with_unique_strategy(path, @unique_strategy, chunk.key, chunk.unique_id) # append .log unless path.include?(".log") path.concat(@path_suffix) end # append .gz case @compress when nil path when :gz "#{path}.gz" end end |
#path_format_with_unique_strategy(path, strategy, chunk_key, chunk_unique) ⇒ Object
139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 |
# File 'lib/fluent/plugin/out_google_cloud_storage_out.rb', line 139 def path_format_with_unique_strategy(path, strategy, chunk_key, chunk_unique) case strategy when nil path when :chunk_id path.gsub(UNIQUE_PLACE_HOLDER, chunk_unique_id_to_str(chunk_unique)) when :increment if @before_chunk_key if @before_chunk_key == chunk_key @samepath_counter += 1 else @samepath_counter = 0 end end @before_chunk_key = chunk_key path.gsub(UNIQUE_PLACE_HOLDER, "#{@samepath_counter}") when :timestamp path.gsub(UNIQUE_PLACE_HOLDER, Time.now.strftime(@unique_format)) end end |
#prepare_client ⇒ Object
112 113 114 115 116 117 118 119 120 121 |
# File 'lib/fluent/plugin/out_google_cloud_storage_out.rb', line 112 def prepare_client @storage = Storage::StorageService.new scopes = [Storage::AUTH_CLOUD_PLATFORM, Storage::AUTH_DEVSTORAGE_FULL_CONTROL] @storage. = ServiceAccountCredentials.make_creds( { :json_key_io => File.open(@service_account_json_key_path), :scope => scopes } ) end |
#send(path, data) ⇒ Object
181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 |
# File 'lib/fluent/plugin/out_google_cloud_storage_out.rb', line 181 def send(path, data) mimetype = MIME::Types.type_for(path).first io = nil if @compress io = StringIO.new("") writer = Zlib::GzipWriter.new(io) writer.write(data) writer.finish io.rewind else io = StringIO.new(data) end @storage.insert_object(@bucket_id, {name: path}, upload_source: io, content_type:mimetype.content_type) end |
#shutdown ⇒ Object
127 128 129 |
# File 'lib/fluent/plugin/out_google_cloud_storage_out.rb', line 127 def shutdown super end |
#start ⇒ Object
123 124 125 |
# File 'lib/fluent/plugin/out_google_cloud_storage_out.rb', line 123 def start super end |
#write(chunk) ⇒ Object
197 198 199 200 201 |
# File 'lib/fluent/plugin/out_google_cloud_storage_out.rb', line 197 def write(chunk) gcs_path = path_format(chunk) send(gcs_path, chunk.read) gcs_path end |