Class: LogStash::Inputs::GoogleCloudStorage

Inherits:
Base
  • Object
show all
Defined in:
lib/logstash/inputs/google_cloud_storage.rb

Overview

Generate a repeating message.

This plugin is intented only as an example.

Defined Under Namespace

Modules: SinceDB

Instance Method Summary collapse

Instance Method Details

#backup_to_bucket(filename) ⇒ Object



136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
# File 'lib/logstash/inputs/google_cloud_storage.rb', line 136

def backup_to_bucket(filename)
  unless @backup_to_bucket.nil?
    backup_key = "#{@backup_add_prefix}#{filename}"

    @logger.debug ("GCS input: bck_up object " + backup_key)

    result = @client.execute(
        api_method: @gcs.objects.copy,
        parameters: {destinationBucket: @backup_to_bucket, destinationObject:backup_key, sourceBucket: @bucket, sourceObject: filename }
    )

    if @delete
      result = @client.execute(
          api_method: @gcs.objects.delete,
          parameters: { bucket: @bucket, object: filename }
      )
    end
  end
end

#backup_to_dir(filename) ⇒ Object



157
158
159
160
161
# File 'lib/logstash/inputs/google_cloud_storage.rb', line 157

def backup_to_dir(filename)
  unless @backup_to_dir.nil?
    FileUtils.cp(filename, @backup_to_dir)
  end
end

#list_new_filesObject



104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
# File 'lib/logstash/inputs/google_cloud_storage.rb', line 104

def list_new_files


  @logger.debug("GCS input: Polling new objects from bucket "+ @bucket)

  objects = @client.execute(
      api_method: @gcs.objects.list,
      parameters: {bucket: @bucket}
  )

  logFiles = {}

  objects.data.items.each do |file|

    unless ignore_filename?(file.name)
      if sincedb.newer?(file.updated)
        logFiles[file.name] = file.updated
        @logger.info("GCS input: Adding to objects[]", :name => file.name)
        @logger.debug("GCS input: objects[] length is: ", :length => logFiles.length)
      end
    end

  end


  return logFiles.keys.sort {|a,b| logFiles[a] <=> logFiles[b]}
end

#process_files(queue) ⇒ Object



165
166
167
168
169
170
171
172
173
174
175
176
# File 'lib/logstash/inputs/google_cloud_storage.rb', line 165

def process_files(queue)
  objects = list_new_files

  objects.each do |filename|
    if stop?
      break
    else
      @logger.debug("GCS input: processing ", :bucket => @bucket, :filename => filename)
      process_log(queue, filename)
    end
  end
end

#registerObject



64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
# File 'lib/logstash/inputs/google_cloud_storage.rb', line 64

def register
  @host = Socket.gethostname

  require "fileutils"
  require "digest/md5"
  require "google/api_client"
  require "openssl"

  @logger.debug("Registering GCS input", :bucket => @bucket, :project => @project, :keyfile => @keyfile)
  # initialize_google_client
  @logger.info ("\n===========================GCS INPUT PLUG-IN===========================")
  @client = Google::APIClient.new(:application_name =>
                                      'Logstash Google Cloud Storage input plugin',
                                  :application_version => '0.1')

  key = Google::APIClient::PKCS12.load_key(@keyfile, @key_password)
   = Google::APIClient::JWTAsserter.new(@service_account,
                                                       'https://www.googleapis.com/auth/devstorage.read_write',
                                                       key)
  @client.authorization = .authorize
  @gcs = @client.discovered_api('storage', 'v1')


  unless @backup_to_dir.nil?
    Dir.mkdir(@backup_to_dir, 0700) unless File.exists?(@backup_to_dir)
  end

  FileUtils.mkdir_p(@temporary_directory) unless Dir.exist?(@temporary_directory)
end

#run(queue) ⇒ Object

def register



94
95
96
97
98
99
# File 'lib/logstash/inputs/google_cloud_storage.rb', line 94

def run(queue)
  @current_thread = Thread.current
  Stud.interval(@interval) do
    process_files(queue)
  end
end

#stopObject



181
182
183
184
185
186
# File 'lib/logstash/inputs/google_cloud_storage.rb', line 181

def stop
  # @current_thread is initialized in the `#run` method,
  # this variable is needed because the `#stop` is a called in another thread
  # than the `#run` method and requiring us to call stop! with a explicit thread.
  Stud.stop!(@current_thread)
end