Class: Google::Cloud::Logging::AsyncWriter

Inherits:
Object
  • Object
show all
Includes:
MonitorMixin
Defined in:
lib/google/cloud/logging/async_writer.rb

Overview

AsyncWriter

AsyncWriter buffers, batches, and transmits log entries efficiently. Writing log entries is asynchronous and will not block.

Batches that cannot be delivered immediately are queued. When the queue is full new batch requests will raise errors that can be consumed using the #on_error callback. This provides back pressure in case the writer cannot keep up with requests.

This object is thread-safe; it may accept write requests from multiple threads simultaneously, and will serialize them when executing in the background thread.

Examples:

require "google/cloud/logging"

logging = Google::Cloud::Logging.new

async = logging.async_writer

entry1 = logging.entry payload: "Job started."
entry2 = logging.entry payload: "Job completed."

labels = { job_size: "large", job_code: "red" }
resource = logging.resource "gae_app",
                            "module_id" => "1",
                            "version_id" => "20150925t173233"

async.write_entries [entry1, entry2],
                    log_name: "my_app_log",
                    resource: resource,
                    labels: labels

Instance Method Summary collapse

Instance Method Details

#flushAsyncWriter

Forces all entries in the current batch to be published immediately.

Returns:

  • (AsyncWriter)

    returns self so calls can be chained.



280
281
282
283
284
285
286
287
# File 'lib/google/cloud/logging/async_writer.rb', line 280

def flush
  synchronize do
    publish_batch!
    @cond.broadcast
  end

  self
end

#last_errorException? Also known as: last_exception

The most recent unhandled error to occur while transmitting log entries.

If an unhandled error has occurred the subscriber will attempt to recover from the error and resume buffering, batching, and transmitting log entries.

Examples:

require "google/cloud/logging"

logging = Google::Cloud::Logging.new

resource = logging.resource "gae_app",
                            module_id: "1",
                            version_id: "20150925t173233"

async = logging.async_writer

logger = async.logger "my_app_log", resource, env: :production
logger.info "Job started."

# If an error was raised, it can be retrieved here:
async.last_error #=> nil

Returns:

  • (Exception, nil)

    error The most recent error raised.



371
372
373
# File 'lib/google/cloud/logging/async_writer.rb', line 371

def last_error
  synchronize { @last_error }
end

#logger(log_name, resource, labels = {}) ⇒ Google::Cloud::Logging::Logger

Creates a logger instance that is API-compatible with Ruby's standard library Logger.

The logger will use AsyncWriter to transmit log entries on a background thread.

Examples:

require "google/cloud/logging"

logging = Google::Cloud::Logging.new

resource = logging.resource "gae_app",
                            module_id: "1",
                            version_id: "20150925t173233"

async = logging.async_writer
logger = async.logger "my_app_log", resource, env: :production
logger.info "Job started."

Parameters:

  • log_name (String)

    A log resource name to be associated with the written log entries.

  • resource (Google::Cloud::Logging::Resource)

    The monitored resource to be associated with written log entries.

  • labels (Hash) (defaults to: {})

    A set of user-defined data to be associated with written log entries.

Returns:



195
196
197
# File 'lib/google/cloud/logging/async_writer.rb', line 195

def logger log_name, resource, labels = {}
  Logger.new self, log_name, resource, labels
end

#on_error {|callback| ... } ⇒ Object

Register to be notified of errors when raised.

If an unhandled error has occurred the writer will attempt to recover from the error and resume buffering, batching, and transmitting log entries

Multiple error handlers can be added.

Examples:

require "google/cloud/logging"
require "google/cloud/error_reporting"

logging = Google::Cloud::Logging.new

resource = logging.resource "gae_app",
                            module_id: "1",
                            version_id: "20150925t173233"

async = logging.async_writer

# Register to be notified when unhandled errors occur.
async.on_error do |error|
  # error can be a AsyncWriterError or AsyncWriteEntriesError
  Google::Cloud::ErrorReporting.report error
end

logger = async.logger "my_app_log", resource, env: :production
logger.info "Job started."

Yields:

  • (callback)

    The block to be called when an error is raised.

Yield Parameters:

  • error (Exception)

    The error raised.



338
339
340
341
342
# File 'lib/google/cloud/logging/async_writer.rb', line 338

def on_error &block
  synchronize do
    @error_callbacks << block
  end
end

#started?boolean

Whether the writer has been started.

Returns:

  • (boolean)

    true when started, false otherwise.



293
294
295
# File 'lib/google/cloud/logging/async_writer.rb', line 293

def started?
  !stopped?
end

#stopAsyncWriter

Begins the process of stopping the writer. Entries already in the queue will be published, but no new entries can be added. Use #wait! to block until the writer is fully stopped and all pending entries have been published.

Returns:

  • (AsyncWriter)

    returns self so calls can be chained.



206
207
208
209
210
211
212
213
214
215
216
217
# File 'lib/google/cloud/logging/async_writer.rb', line 206

def stop
  synchronize do
    break if @stopped

    @stopped = true
    publish_batch!
    @cond.broadcast
    @thread_pool&.shutdown
  end

  self
end

#stop!(timeout = nil, force: nil) ⇒ Symbol Also known as: async_stop!

Stop this asynchronous writer and block until it has been stopped.

Parameters:

  • timeout (Number, nil) (defaults to: nil)

    The maximum number of seconds to wait for shutdown to complete. Will wait forever when the value is nil. The default value is nil.

  • force (Boolean) (defaults to: nil)

    If set to true, and the writer hasn't stopped within the given timeout, kill it forcibly by terminating the thread. This should be used with extreme caution, as it can leave RPCs unfinished. Default is false.

Returns:

  • (Symbol)

    Returns :new if #write_entries has never been called on the AsyncWriter, :stopped if it was already stopped at the time of invocation, :waited if it stopped during the timeout period, :timeout if it is still running after the timeout, or :forced if it was forcibly killed.



258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
# File 'lib/google/cloud/logging/async_writer.rb', line 258

def stop! timeout = nil, force: nil
  return :new unless @thread_pool
  return :stopped if stopped?

  stop
  wait! timeout

  if synchronize { @thread_pool.shutdown? }
    return :waited if timeout
  elsif force
    @thread_pool.kill
    return :forced
  end
  :timeout
end

#stopped?boolean

Whether the writer has been stopped.

Returns:

  • (boolean)

    true when stopped, false otherwise.



301
302
303
# File 'lib/google/cloud/logging/async_writer.rb', line 301

def stopped?
  synchronize { @stopped }
end

#wait!(timeout = nil) ⇒ AsyncWriter

Blocks until the writer is fully stopped, all pending entries have been published, and all callbacks have completed. Does not stop the writer. To stop the writer, first call #stop and then call #wait! to block until the writer is stopped.

Parameters:

  • timeout (Number, nil) (defaults to: nil)

    The maximum number of seconds to wait for shutdown to complete. Will wait forever when the value is nil. The default value is nil.

Returns:

  • (AsyncWriter)

    returns self so calls can be chained.



230
231
232
233
234
235
236
237
238
239
# File 'lib/google/cloud/logging/async_writer.rb', line 230

def wait! timeout = nil
  synchronize do
    if @thread_pool
      @thread_pool.shutdown
      @thread_pool.wait_for_termination timeout
    end
  end

  self
end

#write_entries(entries, log_name: nil, resource: nil, labels: nil) ⇒ Google::Cloud::Logging::AsyncWriter

Asynchronously write one or more log entries to the Stackdriver Logging service.

Unlike the main write_entries method, this method usually does not block. The actual write RPCs will happen in the background, and may be batched with related calls. However, if the queue is full, this method will block until enough space has cleared out.

Examples:

require "google/cloud/logging"

logging = Google::Cloud::Logging.new
async = logging.async_writer

entry = logging.entry payload: "Job started.",
                      log_name: "my_app_log"
entry.resource.type = "gae_app"
entry.resource.labels[:module_id] = "1"
entry.resource.labels[:version_id] = "20150925t173233"

async.write_entries entry

Parameters:

  • entries (Google::Cloud::Logging::Entry, Array<Google::Cloud::Logging::Entry>)

    One or more entry objects to write. The log entries must have values for all required fields.

  • log_name (String) (defaults to: nil)

    A default log ID for those log entries in entries that do not specify their own log_name. See also Entry#log_name=.

  • resource (Resource) (defaults to: nil)

    A default monitored resource for those log entries in entries that do not specify their own resource. See also Entry#resource.

  • labels (Hash{Symbol,String => String}) (defaults to: nil)

    User-defined key:value items that are added to the labels field of each log entry in entries, except when a log entry specifies its own key:value item with the same key. See also Entry#labels=.

Returns:



132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
# File 'lib/google/cloud/logging/async_writer.rb', line 132

def write_entries entries, log_name: nil, resource: nil, labels: nil
  synchronize do
    raise "AsyncWriter has been stopped" if @stopped

    Array(entries).each do |entry|
      # Update the entry to have all the data directly on it
      entry.log_name ||= log_name
      if entry.resource.nil? || entry.resource.empty?
        entry.resource = resource
      end
      entry.labels = labels if entry.labels.nil? || entry.labels.empty?

      # Add the entry to the batch
      @batch ||= Batch.new self
      next if @batch.try_add entry

      # If we can't add to the batch, publish and create a new batch
      publish_batch!
      @batch = Batch.new self
      @batch.add entry
    end

    @thread_pool ||= Concurrent::ThreadPoolExecutor.new \
      max_threads: @threads, max_queue: @max_queue
    @thread ||= Thread.new { run_background }

    publish_batch! if @batch&.ready?

    @cond.broadcast
  end
  self
end