Class: Protobuf::Rpc::Connectors::Base

Inherits:
Object
  • Object
show all
Includes:
Logging
Defined in:
lib/protobuf/rpc/connectors/base.rb

Direct Known Subclasses

Socket, Zmq

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from Logging

initialize_logger, #log_exception, #logger, #sign_message

Constructor Details

#initialize(options) ⇒ Base

Returns a new instance of Base.



30
31
32
33
# File 'lib/protobuf/rpc/connectors/base.rb', line 30

def initialize(options)
  @options = DEFAULT_OPTIONS.merge(options)
  @stats = ::Protobuf::Rpc::Stat.new(:CLIENT)
end

Instance Attribute Details

#complete_cbObject

Returns the value of attribute complete_cb.



28
29
30
# File 'lib/protobuf/rpc/connectors/base.rb', line 28

def complete_cb
  @complete_cb
end

#errorObject (readonly)

Returns the value of attribute error.



27
28
29
# File 'lib/protobuf/rpc/connectors/base.rb', line 27

def error
  @error
end

#failure_cbObject

Returns the value of attribute failure_cb.



28
29
30
# File 'lib/protobuf/rpc/connectors/base.rb', line 28

def failure_cb
  @failure_cb
end

#optionsObject (readonly)

Returns the value of attribute options.



27
28
29
# File 'lib/protobuf/rpc/connectors/base.rb', line 27

def options
  @options
end

#statsObject

Returns the value of attribute stats.



28
29
30
# File 'lib/protobuf/rpc/connectors/base.rb', line 28

def stats
  @stats
end

#success_cbObject

Returns the value of attribute success_cb.



28
29
30
# File 'lib/protobuf/rpc/connectors/base.rb', line 28

def success_cb
  @success_cb
end

Instance Method Details

#any_callbacks?Boolean

Returns:

  • (Boolean)


35
36
37
# File 'lib/protobuf/rpc/connectors/base.rb', line 35

def any_callbacks?
  [@complete_cb, @failure_cb, @success_cb].any?
end

#close_connectionObject



39
40
41
# File 'lib/protobuf/rpc/connectors/base.rb', line 39

def close_connection
  fail 'If you inherit a Connector from Base you must implement close_connection'
end

#completeObject



43
44
45
46
47
48
49
50
51
52
# File 'lib/protobuf/rpc/connectors/base.rb', line 43

def complete
  @stats.stop
  logger.info { @stats.to_s }
  logger.debug { sign_message('Response proceessing complete') }
  @complete_cb.call(self) unless @complete_cb.nil?
rescue => e
  logger.error { sign_message('Complete callback error encountered') }
  log_exception(e)
  raise
end

#data_callback(data) ⇒ Object



54
55
56
57
58
# File 'lib/protobuf/rpc/connectors/base.rb', line 54

def data_callback(data)
  logger.debug { sign_message('Using data_callback') }
  @used_data_callback = true
  @data = data
end

#failure(code, message) ⇒ Object

All failures should be routed through this method.

Parameters:

  • code (Symbol)

    The code we're using (see ::Protobuf::Socketrpc::ErrorReason)

  • message (String)

    The error message



64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
# File 'lib/protobuf/rpc/connectors/base.rb', line 64

def failure(code, message)
  @error = ClientError.new
  @stats.status = @error.code = ::Protobuf::Socketrpc::ErrorReason.fetch(code)
  @error.message = message

  logger.debug { sign_message("Server failed request (invoking on_failure): #{@error.inspect}") }

  @failure_cb.call(@error) unless @failure_cb.nil?
rescue => e
  logger.error { sign_message("Failure callback error encountered") }
  log_exception(e)
  raise
ensure
  complete
end

#first_alive_load_balance?Boolean

Returns:

  • (Boolean)


80
81
82
83
# File 'lib/protobuf/rpc/connectors/base.rb', line 80

def first_alive_load_balance?
  ENV.key?("PB_FIRST_ALIVE_LOAD_BALANCE") ||
    options[:first_alive_load_balance]
end

#initialize_statsObject



85
86
87
88
89
90
91
92
93
# File 'lib/protobuf/rpc/connectors/base.rb', line 85

def initialize_stats
  @stats = ::Protobuf::Rpc::Stat.new(:CLIENT)
  @stats.server = [@options[:port], @options[:host]]
  @stats.service = @options[:service].name
  @stats.method_name = @options[:method].to_s
rescue => ex
  log_exception(ex)
  failure(:RPC_ERROR, "Invalid stats configuration. #{ex.message}")
end

#log_signatureObject



95
96
97
# File 'lib/protobuf/rpc/connectors/base.rb', line 95

def log_signature
  @_log_signature ||= "[client-#{self.class}]"
end

#parse_responseObject



99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
# File 'lib/protobuf/rpc/connectors/base.rb', line 99

def parse_response
  # Close up the connection as we no longer need it
  close_connection

  logger.debug { sign_message("Parsing response from server (connection closed)") }

  # Parse out the raw response
  @stats.response_size = @response_data.size unless @response_data.nil?
  response_wrapper = ::Protobuf::Socketrpc::Response.decode(@response_data)
  @stats.server = response_wrapper.server if response_wrapper.field?(:server)

  # Determine success or failure based on parsed data
  if response_wrapper.field?(:error_reason)
    logger.debug { sign_message("Error response parsed") }

    # fail the call if we already know the client is failed
    # (don't try to parse out the response payload)
    failure(response_wrapper.error_reason, response_wrapper.error)
  else
    logger.debug { sign_message("Successful response parsed") }

    # Ensure client_response is an instance
    parsed = @options[:response_type].decode(response_wrapper.response_proto.to_s)

    if parsed.nil? && !response_wrapper.field?(:error_reason)
      failure(:BAD_RESPONSE_PROTO, 'Unable to parse response from server')
    else
      verify_callbacks
      succeed(parsed)
      return @data if @used_data_callback
    end
  end
end

#ping_portObject



133
134
135
# File 'lib/protobuf/rpc/connectors/base.rb', line 133

def ping_port
  @ping_port ||= ENV["PB_RPC_PING_PORT"]
end

#ping_port_enabled?Boolean

Returns:

  • (Boolean)


137
138
139
# File 'lib/protobuf/rpc/connectors/base.rb', line 137

def ping_port_enabled?
  ENV.key?("PB_RPC_PING_PORT")
end

#request_bytesObject



141
142
143
144
145
146
# File 'lib/protobuf/rpc/connectors/base.rb', line 141

def request_bytes
  validate_request_type!
  return ::Protobuf::Socketrpc::Request.encode(request_fields)
rescue => e
  failure(:INVALID_REQUEST_PROTO, "Could not set request proto: #{e.message}")
end

#request_callerObject



148
149
150
# File 'lib/protobuf/rpc/connectors/base.rb', line 148

def request_caller
  @options[:client_host] || ::Protobuf.client_host
end

#request_fieldsObject



152
153
154
155
156
157
# File 'lib/protobuf/rpc/connectors/base.rb', line 152

def request_fields
  { :service_name => @options[:service].name,
    :method_name => @options[:method].to_s,
    :request_proto => @options[:request],
    :caller => request_caller }
end

#send_requestObject



159
160
161
# File 'lib/protobuf/rpc/connectors/base.rb', line 159

def send_request
  fail 'If you inherit a Connector from Base you must implement send_request'
end

#setup_connectionObject



163
164
165
166
167
# File 'lib/protobuf/rpc/connectors/base.rb', line 163

def setup_connection
  initialize_stats
  @request_data = request_bytes
  @stats.request_size = @request_data.size
end

#succeed(response) ⇒ Object



169
170
171
172
173
174
175
176
177
178
# File 'lib/protobuf/rpc/connectors/base.rb', line 169

def succeed(response)
  logger.debug { sign_message("Server succeeded request (invoking on_success)") }
  @success_cb.call(response) unless @success_cb.nil?
rescue => e
  logger.error { sign_message("Success callback error encountered") }
  log_exception(e)
  failure(:RPC_ERROR, "An exception occurred while calling on_success: #{e.message}")
ensure
  complete
end

#timeoutObject



180
181
182
183
184
185
186
# File 'lib/protobuf/rpc/connectors/base.rb', line 180

def timeout
  if options[:timeout]
    options[:timeout]
  else
    300 # seconds
  end
end

#timeout_wrap(&block) ⇒ Object

Wrap the given block in a timeout of the configured number of seconds.



190
191
192
193
194
# File 'lib/protobuf/rpc/connectors/base.rb', line 190

def timeout_wrap(&block)
  ::Timeout.timeout(timeout, &block)
rescue ::Timeout::Error
  failure(:RPC_FAILED, "The server took longer than #{timeout} seconds to respond")
end

#validate_request_type!Object



196
197
198
199
200
201
202
# File 'lib/protobuf/rpc/connectors/base.rb', line 196

def validate_request_type!
  unless @options[:request].class == @options[:request_type]
    expected = @options[:request_type].name
    actual = @options[:request].class.name
    failure(:INVALID_REQUEST_PROTO, "Expected request type to be type of #{expected}, got #{actual} instead")
  end
end

#verify_callbacksObject



204
205
206
207
208
209
# File 'lib/protobuf/rpc/connectors/base.rb', line 204

def verify_callbacks
  unless any_callbacks?
    logger.debug { sign_message("No callbacks set, using data_callback") }
    @success_cb = @failure_cb = method(:data_callback)
  end
end

#verify_options!Object



211
212
213
214
215
216
# File 'lib/protobuf/rpc/connectors/base.rb', line 211

def verify_options!
  # Verify the options that are necessary and merge them in
  [:service, :method, :host, :port].each do |opt|
    failure(:RPC_ERROR, "Invalid client connection configuration. #{opt} must be a defined option.") if @options[opt].nil?
  end
end