Method: Gruf::SynchronizedClient#call

Defined in:
lib/gruf/synchronized_client.rb

#call(request_method, params = {}, metadata = {}, opts = {}, &block) ⇒ Gruf::Response

Call the client’s method with given params. If another call is already active for the same endpoint and the same params, block until the active call is complete. When unblocked, callers will get a copy of the original result.

error type that was returned

Raises:

  • (Gruf::Client::Error|GRPC::BadStatus)

    If an error occurs, an exception will be raised according to the



60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
# File 'lib/gruf/synchronized_client.rb', line 60

def call(request_method, params = {},  = {}, opts = {}, &block)
  # Allow for bypassing extra behavior for selected methods
  return super if unsynchronized_methods.include?(request_method.to_sym)

  # Generate a unique key based on the method and params
  key = "#{request_method}.#{params.hash}"

  # Create a lock for this call if we haven't seen it already, then acquire it
  lock = @locks.compute_if_absent(key) { Mutex.new }
  lock.synchronize do
    # Return value from results cache if it exists. This occurs for callers that were
    # waiting on the lock while the first caller was making the actual grpc call.
    response = @results.get(lock)
    if response
      Gruf.logger.debug "Returning cached result for #{key}:#{lock.inspect}"
      next response
    end

    # Make the grpc call and record response for other callers that are blocked
    # on the same lock
    response = super
    @results.put(lock, response)

    # Schedule a task to come in later and clean out result to prevent memory bloat
    Concurrent::ScheduledTask.new(@expiry, args: [@results, lock]) { |h, k| h.delete(k) }.execute

    # Remove the lock from the map. The next caller to come through with the
    # same params will create a new lock and start the process over again.
    # Anyone who was waiting on this call will be using a local reference
    # to the same lock as us, and will fetch the result from the cache.
    @locks.delete(key)

    # Return response
    response
  end
end