Class: Gruf::SynchronizedClient

Inherits:
Client
  • Object
show all
Defined in:
lib/gruf/synchronized_client.rb

Overview

Ensures that we only have one active call to a given endpoint with a given set of params. This can be useful to mitigate thundering herds.

Instance Attribute Summary collapse

Attributes inherited from Client

#base_klass, #opts, #service_klass

Instance Method Summary collapse

Methods inherited from Client

#timeout

Methods included from Loggable

#logger

Constructor Details

#initialize(service:, options: {}, client_options: {}) ⇒ SynchronizedClient

Initialize the client and setup the stub

Parameters:

  • service (Module)

    The namespace of the client Stub that is desired to load

  • options (Hash) (defaults to: {})

    A hash of options for the client

  • client_options (Hash) (defaults to: {})

    A hash of options to pass to the gRPC client stub

Options Hash (options:):

  • :unsynchronized_methods (Array)

    A list of methods (as symbols) that should be excluded from synchronization

  • :internal_cache_expiry (Integer)

    The length of time to keep results around for other threads to fetch (in seconds)



41
42
43
44
45
46
47
# File 'lib/gruf/synchronized_client.rb', line 41

def initialize(service:, options: {}, client_options: {})
  @unsynchronized_methods = options.delete(:unsynchronized_methods) { [] }
  @expiry = options.delete(:internal_cache_expiry) { Gruf.synchronized_client_internal_cache_expiry }
  @locks = Concurrent::Map.new
  @results = Concurrent::Map.new
  super
end

Instance Attribute Details

#unsynchronized_methodsObject (readonly)

Returns the value of attribute unsynchronized_methods.



28
29
30
# File 'lib/gruf/synchronized_client.rb', line 28

def unsynchronized_methods
  @unsynchronized_methods
end

Instance Method Details

#call(request_method, params = {}, metadata = {}, opts = {}, &block) ⇒ Gruf::Response

Call the client's method with given params. If another call is already active for the same endpoint and the same params, block until the active call is complete. When unblocked, callers will get a copy of the original result.

error type that was returned

Parameters:

  • request_method (String|Symbol)

    The method that is being requested on the service

  • params (Hash) (defaults to: {})

    (Optional) A hash of parameters that will be inserted into the gRPC request message that is required for the given above call

  • metadata (Hash) (defaults to: {})

    (Optional) A hash of metadata key/values that are transported with the client request

  • opts (Hash) (defaults to: {})

    (Optional) A hash of options to send to the gRPC request_response method

Returns:

Raises:

  • (Gruf::Client::Error|GRPC::BadStatus)

    If an error occurs, an exception will be raised according to the



62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
# File 'lib/gruf/synchronized_client.rb', line 62

def call(request_method, params = {},  = {}, opts = {}, &block)
  # Allow for bypassing extra behavior for selected methods
  return super if unsynchronized_methods.include?(request_method.to_sym)

  # Generate a unique key based on the method and params
  key = "#{request_method}.#{params.hash}"

  # Create a lock for this call if we haven't seen it already, then acquire it
  lock = @locks.compute_if_absent(key) { Mutex.new }
  lock.synchronize do
    # Return value from results cache if it exists. This occurs for callers that were
    # waiting on the lock while the first caller was making the actual grpc call.
    response = @results.get(lock)
    if response
      Gruf.logger.debug "Returning cached result for #{key}:#{lock.inspect}"
      next response
    end

    # Make the grpc call and record response for other callers that are blocked
    # on the same lock
    response = super
    @results.put(lock, response)

    # Schedule a task to come in later and clean out result to prevent memory bloat
    Concurrent::ScheduledTask.new(@expiry, args: [@results, lock]) { |h, k| h.delete(k) }.execute

    # Remove the lock from the map. The next caller to come through with the
    # same params will create a new lock and start the process over again.
    # Anyone who was waiting on this call will be using a local reference
    # to the same lock as us, and will fetch the result from the cache.
    @locks.delete(key)

    # Return response
    response
  end
end