Method: Gruf::SynchronizedClient#call
- Defined in:
- lib/gruf/synchronized_client.rb
#call(request_method, params = {}, metadata = {}, opts = {}, &block) ⇒ Gruf::Response
Call the client’s method with given params. If another call is already active for the same endpoint and the same params, block until the active call is complete. When unblocked, callers will get a copy of the original result.
error type that was returned
60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 |
# File 'lib/gruf/synchronized_client.rb', line 60 def call(request_method, params = {}, = {}, opts = {}, &block) # Allow for bypassing extra behavior for selected methods return super if unsynchronized_methods.include?(request_method.to_sym) # Generate a unique key based on the method and params key = "#{request_method}.#{params.hash}" # Create a lock for this call if we haven't seen it already, then acquire it lock = @locks.compute_if_absent(key) { Mutex.new } lock.synchronize do # Return value from results cache if it exists. This occurs for callers that were # waiting on the lock while the first caller was making the actual grpc call. response = @results.get(lock) if response Gruf.logger.debug "Returning cached result for #{key}:#{lock.inspect}" next response end # Make the grpc call and record response for other callers that are blocked # on the same lock response = super @results.put(lock, response) # Schedule a task to come in later and clean out result to prevent memory bloat Concurrent::ScheduledTask.new(@expiry, args: [@results, lock]) { |h, k| h.delete(k) }.execute # Remove the lock from the map. The next caller to come through with the # same params will create a new lock and start the process over again. # Anyone who was waiting on this call will be using a local reference # to the same lock as us, and will fetch the result from the cache. @locks.delete(key) # Return response response end end |