Class: Fluent::GoogleCloudStorageOutput
- Inherits:
-
TimeSlicedOutput
- Object
- TimeSlicedOutput
- Fluent::GoogleCloudStorageOutput
- Includes:
- Mixin::ConfigPlaceholders, Mixin::PlainTextFormatter
- Defined in:
- lib/fluent/plugin/out_google_cloud_storage.rb
Constant Summary collapse
- CHUNK_ID_PLACE_HOLDER =
'${chunk_id}'
Instance Method Summary collapse
- #call_google_api(params) ⇒ Object
- #chunk_unique_id_to_str(unique_id) ⇒ Object
- #configure(conf) ⇒ Object
-
#initialize ⇒ GoogleCloudStorageOutput
constructor
A new instance of GoogleCloudStorageOutput.
- #path_format(chunk_key) ⇒ Object
- #prepare_client ⇒ Object
- #send_data(path, data) ⇒ Object
- #shutdown ⇒ Object
- #start ⇒ Object
- #write(chunk) ⇒ Object
Constructor Details
#initialize ⇒ GoogleCloudStorageOutput
Returns a new instance of GoogleCloudStorageOutput.
41 42 43 44 45 46 47 48 |
# File 'lib/fluent/plugin/out_google_cloud_storage.rb', line 41 def initialize super require 'zlib' require 'net/http' require 'time' require 'google/api_client' require 'signet/oauth_2/client' end |
Instance Method Details
#call_google_api(params) ⇒ Object
55 56 57 58 59 60 61 |
# File 'lib/fluent/plugin/out_google_cloud_storage.rb', line 55 def call_google_api(params) # refresh_auth if @google_api_client..expired? @google_api_client..fetch_access_token! end return @google_api_client.execute(params) end |
#chunk_unique_id_to_str(unique_id) ⇒ Object
116 117 118 |
# File 'lib/fluent/plugin/out_google_cloud_storage.rb', line 116 def chunk_unique_id_to_str(unique_id) unique_id.unpack('C*').map{|x| x.to_s(16).rjust(2,'0')}.join('') end |
#configure(conf) ⇒ Object
63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 |
# File 'lib/fluent/plugin/out_google_cloud_storage.rb', line 63 def configure(conf) if conf['path'] if conf['path'].index('%S') conf['time_slice_format'] = '%Y%m%d%H%M%S' elsif conf['path'].index('%M') conf['time_slice_format'] = '%Y%m%d%H%M' elsif conf['path'].index('%H') conf['time_slice_format'] = '%Y%m%d%H' end end super @client = prepare_client() if @path.index(CHUNK_ID_PLACE_HOLDER).nil? raise Fluent::ConfigError, "path must contain ${chunk_id}, which is the placeholder for chunk_id, when append is set to false." end end |
#path_format(chunk_key) ⇒ Object
112 113 114 |
# File 'lib/fluent/plugin/out_google_cloud_storage.rb', line 112 def path_format(chunk_key) Time.strptime(chunk_key, @time_slice_format).strftime(@path) end |
#prepare_client ⇒ Object
83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 |
# File 'lib/fluent/plugin/out_google_cloud_storage.rb', line 83 def prepare_client @google_api_client = Google::APIClient.new( :application_name => "fluent-plugin-google-cloud-storage", :application_version => "0.3.1") begin key = Google::APIClient::KeyUtils.load_from_pkcs12( @service_pkcs12_path, @service_pkcs12_password) @google_api_client. = Signet::OAuth2::Client.new( token_credential_uri: "https://accounts.google.com/o/oauth2/token", audience: "https://accounts.google.com/o/oauth2/token", issuer: @service_email, scope: "https://www.googleapis.com/auth/devstorage.read_write", signing_key: key) @google_api_client..fetch_access_token! rescue Signet::AuthorizationError raise Fluent::ConfigError, "Error occurred authenticating with Google" end @storage_api = @google_api_client.discovered_api("storage", "v1") return @google_api_client end |
#send_data(path, data) ⇒ Object
120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 |
# File 'lib/fluent/plugin/out_google_cloud_storage.rb', line 120 def send_data(path, data) content_type = "application/json" io = nil if ["gz", "gzip"].include?(@compress) io = StringIO.new("") writer = Zlib::GzipWriter.new(io) writer.write(data) writer.finish io.rewind else io = StringIO.new(data) end media = Google::APIClient::UploadIO.new(io, content_type, File.basename(path)) call_google_api(api_method: @storage_api.objects.insert, parameters: { uploadType: "multipart", project: @project_id, bucket: @bucket_id, name: path }, body_object: { contentType: media.content_type }, media: media) end |
#shutdown ⇒ Object
108 109 110 |
# File 'lib/fluent/plugin/out_google_cloud_storage.rb', line 108 def shutdown super end |
#start ⇒ Object
104 105 106 |
# File 'lib/fluent/plugin/out_google_cloud_storage.rb', line 104 def start super end |
#write(chunk) ⇒ Object
147 148 149 150 151 152 153 |
# File 'lib/fluent/plugin/out_google_cloud_storage.rb', line 147 def write(chunk) hdfs_path = path_format(chunk.key).gsub(CHUNK_ID_PLACE_HOLDER, chunk_unique_id_to_str(chunk.unique_id)) send_data(hdfs_path, chunk.read) hdfs_path end |