Class: Google::Gax::Executor

Inherits:
Object
  • Object
show all
Defined in:
lib/google/gax/bundling.rb

Overview

Organizes bundling for an api service that requires it.

Instance Method Summary collapse

Constructor Details

#initialize(options, timer: Timer.new) ⇒ Executor

Returns a new instance of Executor.

Parameters:

  • options (BundleOptions)

    configures strategy this instance uses when executing bundled functions.

  • timer (Timer) (defaults to: Timer.new)

    the timer is used to handle the functionality of timing threads.



259
260
261
262
263
264
265
266
# File 'lib/google/gax/bundling.rb', line 259

def initialize(options, timer: Timer.new)
  @options = options
  @tasks = {}
  @timer = timer

  # Use a Monitor in order to have the mutex behave like a reentrant lock.
  @tasks_lock = Monitor.new
end

Instance Method Details

#closeObject

This function should be called before the main thread exits in order to ensure that all api calls are made.



362
363
364
365
366
367
368
# File 'lib/google/gax/bundling.rb', line 362

def close
  @tasks_lock.synchronize do
    @tasks.each do |bundle_id, _|
      run_now(bundle_id)
    end
  end
end

#schedule(api_call, bundle_id, bundle_desc, bundling_request) ⇒ Event

Schedules bundle_desc of bundling_request as part of bundle id.

Parameters:

  • api_call (Proc)

    used to make an api call when the task is run.

  • bundle_id (String)

    the id of this bundle.

  • bundle_desc (BundleDescriptor)

    describes the structure of the bundled call.

  • bundling_request (Object)

    the request to pass as the arg to the api_call.

Returns:

  • (Event)

    an Event that can be used to wait on the response.



278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
# File 'lib/google/gax/bundling.rb', line 278

def schedule(api_call, bundle_id, bundle_desc,
             bundling_request)
  bundle = bundle_for(api_call, bundle_id, bundle_desc,
                      bundling_request)
  elts = bundling_request[bundle_desc.bundled_field.to_s]
  event = bundle.extend(elts)

  count_threshold = @options.element_count_threshold
  if count_threshold > 0 && bundle.element_count >= count_threshold
    run_now(bundle.bundle_id)
  end

  size_threshold = @options.request_byte_threshold
  if size_threshold > 0 && bundle.request_bytesize >= size_threshold
    run_now(bundle.bundle_id)
  end

  # TODO: Implement byte and element count limits.

  event
end