Class: Google::Gax::Executor

Inherits:
Object
  • Object
show all
Defined in:
lib/google/gax/bundling.rb

Overview

Organizes bundling for an api service that requires it.

Instance Method Summary collapse

Constructor Details

#initialize(options, timer: Timer.new) ⇒ Executor

Returns a new instance of Executor.

Parameters:

  • options (BundleOptions)

    configures strategy this instance uses when executing bundled functions.

  • timer (Timer) (defaults to: Timer.new)

    the timer is used to handle the functionality of timing threads.



261
262
263
264
265
266
267
268
# File 'lib/google/gax/bundling.rb', line 261

def initialize(options, timer: Timer.new)
  @options = options
  @tasks = {}
  @timer = timer

  # Use a Monitor in order to have the mutex behave like a reentrant lock.
  @tasks_lock = Monitor.new
end

Instance Method Details

#closeObject

This function should be called before the main thread exits in order to ensure that all api calls are made.



364
365
366
367
368
369
370
# File 'lib/google/gax/bundling.rb', line 364

def close
  @tasks_lock.synchronize do
    @tasks.each do |bundle_id, _|
      run_now(bundle_id)
    end
  end
end

#schedule(api_call, bundle_id, bundle_desc, bundling_request) ⇒ Event

Schedules bundle_desc of bundling_request as part of bundle id.

Parameters:

  • api_call (Proc)

    used to make an api call when the task is run.

  • bundle_id (String)

    the id of this bundle.

  • bundle_desc (BundleDescriptor)

    describes the structure of the bundled call.

  • bundling_request (Object)

    the request to pass as the arg to the api_call.

Returns:

  • (Event)

    an Event that can be used to wait on the response.



280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
# File 'lib/google/gax/bundling.rb', line 280

def schedule(api_call, bundle_id, bundle_desc,
             bundling_request)
  bundle = bundle_for(api_call, bundle_id, bundle_desc,
                      bundling_request)
  elts = bundling_request[bundle_desc.bundled_field.to_s]
  event = bundle.extend(elts)

  count_threshold = @options.element_count_threshold
  if count_threshold > 0 && bundle.element_count >= count_threshold
    run_now(bundle.bundle_id)
  end

  size_threshold = @options.request_byte_threshold
  if size_threshold > 0 && bundle.request_bytesize >= size_threshold
    run_now(bundle.bundle_id)
  end

  # TODO: Implement byte and element count limits.

  event
end