Class: Fluent::GoogleCloudStorageOut

Inherits:
TimeSlicedOutput
  • Object
show all
Includes:
Mixin::ConfigPlaceholders
Defined in:
lib/fluent/plugin/out_google_cloud_storage_out.rb

Constant Summary collapse

Storage =
Google::Apis::StorageV1
ServiceAccountCredentials =
Google::Auth::ServiceAccountCredentials
SUPPORTED_COMPRESS =
{
  'gz' => :gz,
  'gzip' => :gz,
}
UNIQUE_STRATEGY =
{
  'chunk_id' => :chunk_id,
  'increment' => :increment,
  'timestamp' => :timestamp,
}
UNIQUE_PLACE_HOLDER =
'${unique}'

Instance Method Summary collapse

Constructor Details

#initializeGoogleCloudStorageOut

Returns a new instance of GoogleCloudStorageOut.



72
73
74
75
76
77
78
# File 'lib/fluent/plugin/out_google_cloud_storage_out.rb', line 72

def initialize
  super
  require 'zlib'
  require 'net/http'
  require 'time'
  require 'mime-types'
end

Instance Method Details

#chunk_unique_id_to_str(unique_id) ⇒ Object



135
136
137
# File 'lib/fluent/plugin/out_google_cloud_storage_out.rb', line 135

def chunk_unique_id_to_str(unique_id)
  unique_id.unpack('C*').map{|x| x.to_s(16).rjust(2,'0')}.join('')
end

#configure(conf) ⇒ Object



85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
# File 'lib/fluent/plugin/out_google_cloud_storage_out.rb', line 85

def configure(conf)
  if conf['path']
    if conf['path'].index('%S')
      conf['time_slice_format'] = '%Y%m%d%H%M%S'
    elsif conf['path'].index('%M')
      conf['time_slice_format'] = '%Y%m%d%H%M'
    elsif conf['path'].index('%H')
      conf['time_slice_format'] = '%Y%m%d%H'
    end
  end

  super

  if @path.index(UNIQUE_PLACE_HOLDER).nil? && @unique_strategy
    raise Fluent::ConfigError, "Path must contain ${unique}, or you set the unique_strategy to nil."
  end

  @formatter = Plugin.new_formatter(@format)
  @formatter.configure(conf)
  @path_suffix = ".log"
  prepare_client()

  if @unique_strategy == :increment
     @samepath_counter = 0
  end
end

#format(tag, time, record) ⇒ Object



131
132
133
# File 'lib/fluent/plugin/out_google_cloud_storage_out.rb', line 131

def format(tag, time, record)
  @formatter.format(tag, time, record)
end

#path_format(chunk) ⇒ Object



160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
# File 'lib/fluent/plugin/out_google_cloud_storage_out.rb', line 160

def path_format(chunk)
  # format from chunk key
  path = Time.strptime(chunk.key, @time_slice_format).strftime(@path)

  # format for make unique
  path = path_format_with_unique_strategy(path, @unique_strategy, chunk.key, chunk.unique_id)

  # append .log
  unless path.include?(".log")
    path.concat(@path_suffix)
  end

  # append .gz
  case @compress
  when nil
    path
  when :gz
    "#{path}.gz"
  end
end

#path_format_with_unique_strategy(path, strategy, chunk_key, chunk_unique) ⇒ Object



139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
# File 'lib/fluent/plugin/out_google_cloud_storage_out.rb', line 139

def path_format_with_unique_strategy(path, strategy, chunk_key, chunk_unique)
   case strategy
   when nil
     path
   when :chunk_id
     path.gsub(UNIQUE_PLACE_HOLDER, chunk_unique_id_to_str(chunk_unique))
   when :increment
     if @before_chunk_key
       if @before_chunk_key == chunk_key
         @samepath_counter += 1
       else
         @samepath_counter = 0
       end
     end
     @before_chunk_key = chunk_key
     path.gsub(UNIQUE_PLACE_HOLDER, "#{@samepath_counter}")
   when :timestamp
     path.gsub(UNIQUE_PLACE_HOLDER, Time.now.strftime(@unique_format))
   end
end

#prepare_clientObject



112
113
114
115
116
117
118
119
120
121
# File 'lib/fluent/plugin/out_google_cloud_storage_out.rb', line 112

def prepare_client
  @storage = Storage::StorageService.new
  scopes = [Storage::AUTH_CLOUD_PLATFORM, Storage::AUTH_DEVSTORAGE_FULL_CONTROL]
  @storage.authorization = ServiceAccountCredentials.make_creds(
    {
      :json_key_io => File.open(),
      :scope => scopes
    }
  )
end

#send(path, data) ⇒ Object



181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
# File 'lib/fluent/plugin/out_google_cloud_storage_out.rb', line 181

def send(path, data)
  mimetype = MIME::Types.type_for(path).first
  io = nil
  if @compress
    io = StringIO.new("")
    writer = Zlib::GzipWriter.new(io)
    writer.write(data)
    writer.finish
    io.rewind
  else
    io = StringIO.new(data)
  end

  @storage.insert_object(@bucket_id, {name: path}, upload_source: io, content_type:mimetype.content_type)
end

#shutdownObject



127
128
129
# File 'lib/fluent/plugin/out_google_cloud_storage_out.rb', line 127

def shutdown
  super
end

#startObject



123
124
125
# File 'lib/fluent/plugin/out_google_cloud_storage_out.rb', line 123

def start
  super
end

#write(chunk) ⇒ Object



197
198
199
200
201
# File 'lib/fluent/plugin/out_google_cloud_storage_out.rb', line 197

def write(chunk)
  gcs_path = path_format(chunk)
  send(gcs_path, chunk.read)
  gcs_path
end