Class: Google::Gax::Executor

Inherits:
Object
  • Object
show all
Defined in:
lib/google/gax/bundling.rb

Overview

Organizes bundling for an api service that requires it.

Instance Method Summary collapse

Constructor Details

#initialize(options, timer: Timer.new) ⇒ Executor

Returns a new instance of Executor.

Parameters:

  • options (BundleOptions)

    configures strategy this instance uses when executing bundled functions.

  • timer (Timer) (defaults to: Timer.new)

    the timer is used to handle the functionality of timing threads.



256
257
258
259
260
261
262
263
# File 'lib/google/gax/bundling.rb', line 256

def initialize(options, timer: Timer.new)
  @options = options
  @tasks = {}
  @timer = timer

  # Use a Monitor in order to have the mutex behave like a reentrant lock.
  @tasks_lock = Monitor.new
end

Instance Method Details

#closeObject

This function should be called before the main thread exits in order to ensure that all api calls are made.



359
360
361
362
363
364
365
# File 'lib/google/gax/bundling.rb', line 359

def close
  @tasks_lock.synchronize do
    @tasks.each do |bundle_id, _|
      run_now(bundle_id)
    end
  end
end

#schedule(api_call, bundle_id, bundle_desc, bundling_request) ⇒ Event

Schedules bundle_desc of bundling_request as part of bundle id.

Parameters:

  • api_call (Proc)

    used to make an api call when the task is run.

  • bundle_id (String)

    the id of this bundle.

  • bundle_desc (BundleDescriptor)

    describes the structure of the bundled call.

  • bundling_request (Object)

    the request to pass as the arg to the api_call.

Returns:

  • (Event)

    an Event that can be used to wait on the response.



275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
# File 'lib/google/gax/bundling.rb', line 275

def schedule(api_call, bundle_id, bundle_desc,
             bundling_request)
  bundle = bundle_for(api_call, bundle_id, bundle_desc,
                      bundling_request)
  elts = bundling_request[bundle_desc.bundled_field.to_s]
  event = bundle.extend(elts)

  count_threshold = @options.element_count_threshold
  if count_threshold > 0 && bundle.element_count >= count_threshold
    run_now(bundle.bundle_id)
  end

  size_threshold = @options.request_byte_threshold
  if size_threshold > 0 && bundle.request_bytesize >= size_threshold
    run_now(bundle.bundle_id)
  end

  # TODO: Implement byte and element count limits.

  event
end