Class: Protobuf::Rpc::Connectors::Base

Inherits:
Object
  • Object
show all
Includes:
Logging
Defined in:
lib/protobuf/rpc/connectors/base.rb

Direct Known Subclasses

Http, Socket, Zmq

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from Logging

initialize_logger, #log_exception, #logger, #sign_message

Constructor Details

#initialize(options) ⇒ Base

Returns a new instance of Base.



30
31
32
33
# File 'lib/protobuf/rpc/connectors/base.rb', line 30

def initialize(options)
  @options = DEFAULT_OPTIONS.merge(options)
  @stats = ::Protobuf::Rpc::Stat.new(:CLIENT)
end

Instance Attribute Details

#complete_cbObject

Returns the value of attribute complete_cb.



28
29
30
# File 'lib/protobuf/rpc/connectors/base.rb', line 28

def complete_cb
  @complete_cb
end

#errorObject (readonly)

Returns the value of attribute error.



27
28
29
# File 'lib/protobuf/rpc/connectors/base.rb', line 27

def error
  @error
end

#failure_cbObject

Returns the value of attribute failure_cb.



28
29
30
# File 'lib/protobuf/rpc/connectors/base.rb', line 28

def failure_cb
  @failure_cb
end

#optionsObject (readonly)

Returns the value of attribute options.



27
28
29
# File 'lib/protobuf/rpc/connectors/base.rb', line 27

def options
  @options
end

#statsObject

Returns the value of attribute stats.



28
29
30
# File 'lib/protobuf/rpc/connectors/base.rb', line 28

def stats
  @stats
end

#success_cbObject

Returns the value of attribute success_cb.



28
29
30
# File 'lib/protobuf/rpc/connectors/base.rb', line 28

def success_cb
  @success_cb
end

Instance Method Details

#any_callbacks?Boolean

Returns:

  • (Boolean)


35
36
37
# File 'lib/protobuf/rpc/connectors/base.rb', line 35

def any_callbacks?
  [@complete_cb, @failure_cb, @success_cb].any?
end

#close_connectionObject



39
40
41
# File 'lib/protobuf/rpc/connectors/base.rb', line 39

def close_connection
  fail 'If you inherit a Connector from Base you must implement close_connection'
end

#completeObject



43
44
45
46
47
48
49
50
51
52
# File 'lib/protobuf/rpc/connectors/base.rb', line 43

def complete
  @stats.stop
  logger.info { @stats.to_s }
  logger.debug { sign_message('Response proceessing complete') }
  @complete_cb.call(self) unless @complete_cb.nil?
rescue => e
  logger.error { sign_message('Complete callback error encountered') }
  log_exception(e)
  raise
end

#data_callback(data) ⇒ Object



54
55
56
57
58
# File 'lib/protobuf/rpc/connectors/base.rb', line 54

def data_callback(data)
  logger.debug { sign_message('Using data_callback') }
  @used_data_callback = true
  @data = data
end

#failure(code, message) ⇒ Object

All failures should be routed through this method.

Parameters:

  • code (Symbol)

    The code we're using (see ::Protobuf::Socketrpc::ErrorReason)

  • message (String)

    The error message



64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
# File 'lib/protobuf/rpc/connectors/base.rb', line 64

def failure(code, message)
  @error = ClientError.new
  @error.code = ::Protobuf::Socketrpc::ErrorReason.fetch(code)
  @error.message = message
  logger.debug { sign_message("Server failed request (invoking on_failure): #{@error.inspect}") }

  @stats.failure(code)
  @failure_cb.call(@error) unless @failure_cb.nil?
rescue => e
  logger.error { sign_message("Failure callback error encountered") }
  log_exception(e)
  raise
ensure
  complete
end

#first_alive_load_balance?Boolean

Returns:

  • (Boolean)


80
81
82
83
# File 'lib/protobuf/rpc/connectors/base.rb', line 80

def first_alive_load_balance?
  ENV.key?("PB_FIRST_ALIVE_LOAD_BALANCE") ||
    options[:first_alive_load_balance]
end

#initialize_statsObject



85
86
87
88
89
90
91
92
93
# File 'lib/protobuf/rpc/connectors/base.rb', line 85

def initialize_stats
  @stats = ::Protobuf::Rpc::Stat.new(:CLIENT)
  @stats.server = [@options[:port], @options[:host]]
  @stats.service = @options[:service].name
  @stats.method_name = @options[:method].to_s
rescue => ex
  log_exception(ex)
  failure(:RPC_ERROR, "Invalid stats configuration. #{ex.message}")
end

#log_signatureObject



95
96
97
# File 'lib/protobuf/rpc/connectors/base.rb', line 95

def log_signature
  @_log_signature ||= "[client-#{self.class}]"
end

#parse_responseObject



99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
# File 'lib/protobuf/rpc/connectors/base.rb', line 99

def parse_response
  # Close up the connection as we no longer need it
  close_connection

  logger.debug { sign_message("Parsing response from server (connection closed)") }

  # Parse out the raw response
  @stats.response_size = @response_data.size unless @response_data.nil?
  response_wrapper = ::Protobuf::Socketrpc::Response.decode(@response_data)

  # Determine success or failure based on parsed data
  if response_wrapper.field?(:error_reason)
    logger.debug { sign_message("Error response parsed") }

    # fail the call if we already know the client is failed
    # (don't try to parse out the response payload)
    failure(response_wrapper.error_reason, response_wrapper.error)
  else
    logger.debug { sign_message("Successful response parsed") }

    # Ensure client_response is an instance
    parsed = @options[:response_type].decode(response_wrapper.response_proto.to_s)

    if parsed.nil? && !response_wrapper.field?(:error_reason)
      failure(:BAD_RESPONSE_PROTO, 'Unable to parse response from server')
    else
      verify_callbacks
      succeed(parsed)
      return @data if @used_data_callback
    end
  end
end

#ping_portObject



132
133
134
# File 'lib/protobuf/rpc/connectors/base.rb', line 132

def ping_port
  @ping_port ||= ENV["PB_RPC_PING_PORT"]
end

#ping_port_enabled?Boolean

Returns:

  • (Boolean)


136
137
138
# File 'lib/protobuf/rpc/connectors/base.rb', line 136

def ping_port_enabled?
  ENV.key?("PB_RPC_PING_PORT")
end

#request_bytesObject



140
141
142
143
144
145
146
147
148
149
150
# File 'lib/protobuf/rpc/connectors/base.rb', line 140

def request_bytes
  validate_request_type!
  fields = { :service_name => @options[:service].name,
             :method_name => @options[:method].to_s,
             :request_proto => @options[:request],
             :caller => request_caller }

  return ::Protobuf::Socketrpc::Request.encode(fields)
rescue => e
  failure(:INVALID_REQUEST_PROTO, "Could not set request proto: #{e.message}")
end

#request_callerObject



152
153
154
# File 'lib/protobuf/rpc/connectors/base.rb', line 152

def request_caller
  @options[:client_host] || ::Protobuf.client_host
end

#send_requestObject



156
157
158
# File 'lib/protobuf/rpc/connectors/base.rb', line 156

def send_request
  fail 'If you inherit a Connector from Base you must implement send_request'
end

#setup_connectionObject



160
161
162
163
164
# File 'lib/protobuf/rpc/connectors/base.rb', line 160

def setup_connection
  initialize_stats
  @request_data = request_bytes
  @stats.request_size = @request_data.size
end

#succeed(response) ⇒ Object



166
167
168
169
170
171
172
173
174
175
176
# File 'lib/protobuf/rpc/connectors/base.rb', line 166

def succeed(response)
  logger.debug { sign_message("Server succeeded request (invoking on_success)") }
  @stats.success
  @success_cb.call(response) unless @success_cb.nil?
rescue => e
  logger.error { sign_message("Success callback error encountered") }
  log_exception(e)
  failure(:RPC_ERROR, "An exception occurred while calling on_success: #{e.message}")
ensure
  complete
end

#timeoutObject



178
179
180
181
182
183
184
# File 'lib/protobuf/rpc/connectors/base.rb', line 178

def timeout
  if options[:timeout]
    options[:timeout]
  else
    300 # seconds
  end
end

#timeout_wrap(&block) ⇒ Object

Wrap the given block in a timeout of the configured number of seconds.



188
189
190
191
192
# File 'lib/protobuf/rpc/connectors/base.rb', line 188

def timeout_wrap(&block)
  ::Timeout.timeout(timeout, &block)
rescue ::Timeout::Error
  failure(:RPC_FAILED, "The server took longer than #{timeout} seconds to respond")
end

#validate_request_type!Object



194
195
196
197
198
199
200
# File 'lib/protobuf/rpc/connectors/base.rb', line 194

def validate_request_type!
  unless @options[:request].class == @options[:request_type]
    expected = @options[:request_type].name
    actual = @options[:request].class.name
    failure(:INVALID_REQUEST_PROTO, "Expected request type to be type of #{expected}, got #{actual} instead")
  end
end

#verify_callbacksObject



202
203
204
205
206
207
# File 'lib/protobuf/rpc/connectors/base.rb', line 202

def verify_callbacks
  unless any_callbacks?
    logger.debug { sign_message("No callbacks set, using data_callback") }
    @success_cb = @failure_cb = method(:data_callback)
  end
end

#verify_options!Object



209
210
211
212
213
214
# File 'lib/protobuf/rpc/connectors/base.rb', line 209

def verify_options!
  # Verify the options that are necessary and merge them in
  [:service, :method, :host, :port].each do |opt|
    failure(:RPC_ERROR, "Invalid client connection configuration. #{opt} must be a defined option.") if @options[opt].nil?
  end
end