Class: Protobuf::Rpc::Connectors::Base
- Inherits:
-
Object
- Object
- Protobuf::Rpc::Connectors::Base
show all
- Includes:
- Logging
- Defined in:
- lib/protobuf/rpc/connectors/base.rb
Instance Attribute Summary collapse
Instance Method Summary
collapse
Methods included from Logging
initialize_logger, #log_exception, #logger, #sign_message
Constructor Details
#initialize(options) ⇒ Base
Returns a new instance of Base.
30
31
32
33
|
# File 'lib/protobuf/rpc/connectors/base.rb', line 30
def initialize(options)
@options = DEFAULT_OPTIONS.merge(options)
@stats = ::Protobuf::Rpc::Stat.new(:CLIENT)
end
|
Instance Attribute Details
#complete_cb ⇒ Object
Returns the value of attribute complete_cb.
28
29
30
|
# File 'lib/protobuf/rpc/connectors/base.rb', line 28
def complete_cb
@complete_cb
end
|
#error ⇒ Object
Returns the value of attribute error.
27
28
29
|
# File 'lib/protobuf/rpc/connectors/base.rb', line 27
def error
@error
end
|
#failure_cb ⇒ Object
Returns the value of attribute failure_cb.
28
29
30
|
# File 'lib/protobuf/rpc/connectors/base.rb', line 28
def failure_cb
@failure_cb
end
|
#options ⇒ Object
Returns the value of attribute options.
27
28
29
|
# File 'lib/protobuf/rpc/connectors/base.rb', line 27
def options
@options
end
|
#stats ⇒ Object
Returns the value of attribute stats.
28
29
30
|
# File 'lib/protobuf/rpc/connectors/base.rb', line 28
def stats
@stats
end
|
#success_cb ⇒ Object
Returns the value of attribute success_cb.
28
29
30
|
# File 'lib/protobuf/rpc/connectors/base.rb', line 28
def success_cb
@success_cb
end
|
Instance Method Details
#any_callbacks? ⇒ Boolean
35
36
37
|
# File 'lib/protobuf/rpc/connectors/base.rb', line 35
def any_callbacks?
[@complete_cb, @failure_cb, @success_cb].any?
end
|
#close_connection ⇒ Object
39
40
41
|
# File 'lib/protobuf/rpc/connectors/base.rb', line 39
def close_connection
fail 'If you inherit a Connector from Base you must implement close_connection'
end
|
#complete ⇒ Object
43
44
45
46
47
48
49
50
51
52
|
# File 'lib/protobuf/rpc/connectors/base.rb', line 43
def complete
@stats.stop
logger.info { @stats.to_s }
logger.debug { sign_message('Response proceessing complete') }
@complete_cb.call(self) unless @complete_cb.nil?
rescue => e
logger.error { sign_message('Complete callback error encountered') }
log_exception(e)
raise
end
|
#data_callback(data) ⇒ Object
54
55
56
57
58
|
# File 'lib/protobuf/rpc/connectors/base.rb', line 54
def data_callback(data)
logger.debug { sign_message('Using data_callback') }
@used_data_callback = true
@data = data
end
|
#failure(code, message) ⇒ Object
All failures should be routed through this method.
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
|
# File 'lib/protobuf/rpc/connectors/base.rb', line 64
def failure(code, message)
@error = ClientError.new
@stats.status = @error.code = ::Protobuf::Socketrpc::ErrorReason.fetch(code)
@error.message = message
logger.debug { sign_message("Server failed request (invoking on_failure): #{@error.inspect}") }
@failure_cb.call(@error) unless @failure_cb.nil?
rescue => e
logger.error { sign_message("Failure callback error encountered") }
log_exception(e)
raise
ensure
complete
end
|
#first_alive_load_balance? ⇒ Boolean
80
81
82
83
|
# File 'lib/protobuf/rpc/connectors/base.rb', line 80
def first_alive_load_balance?
ENV.key?("PB_FIRST_ALIVE_LOAD_BALANCE") ||
options[:first_alive_load_balance]
end
|
#initialize_stats ⇒ Object
85
86
87
88
89
90
91
92
93
|
# File 'lib/protobuf/rpc/connectors/base.rb', line 85
def initialize_stats
@stats = ::Protobuf::Rpc::Stat.new(:CLIENT)
@stats.server = [@options[:port], @options[:host]]
@stats.service = @options[:service].name
@stats.method_name = @options[:method].to_s
rescue => ex
log_exception(ex)
failure(:RPC_ERROR, "Invalid stats configuration. #{ex.message}")
end
|
#log_signature ⇒ Object
95
96
97
|
# File 'lib/protobuf/rpc/connectors/base.rb', line 95
def log_signature
@_log_signature ||= "[client-#{self.class}]"
end
|
#parse_response ⇒ Object
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
|
# File 'lib/protobuf/rpc/connectors/base.rb', line 99
def parse_response
close_connection
logger.debug { sign_message("Parsing response from server (connection closed)") }
@stats.response_size = @response_data.size unless @response_data.nil?
response_wrapper = ::Protobuf::Socketrpc::Response.decode(@response_data)
@stats.server = response_wrapper.server if response_wrapper.field?(:server)
if response_wrapper.field?(:error_reason)
logger.debug { sign_message("Error response parsed") }
failure(response_wrapper.error_reason, response_wrapper.error)
else
logger.debug { sign_message("Successful response parsed") }
parsed = @options[:response_type].decode(response_wrapper.response_proto.to_s)
if parsed.nil? && !response_wrapper.field?(:error_reason)
failure(:BAD_RESPONSE_PROTO, 'Unable to parse response from server')
else
verify_callbacks
succeed(parsed)
return @data if @used_data_callback
end
end
end
|
#ping_port ⇒ Object
133
134
135
|
# File 'lib/protobuf/rpc/connectors/base.rb', line 133
def ping_port
@ping_port ||= ENV["PB_RPC_PING_PORT"]
end
|
#ping_port_enabled? ⇒ Boolean
137
138
139
|
# File 'lib/protobuf/rpc/connectors/base.rb', line 137
def ping_port_enabled?
ENV.key?("PB_RPC_PING_PORT")
end
|
#request_bytes ⇒ Object
141
142
143
144
145
146
|
# File 'lib/protobuf/rpc/connectors/base.rb', line 141
def request_bytes
validate_request_type!
return ::Protobuf::Socketrpc::Request.encode(request_fields)
rescue => e
failure(:INVALID_REQUEST_PROTO, "Could not set request proto: #{e.message}")
end
|
#request_caller ⇒ Object
148
149
150
|
# File 'lib/protobuf/rpc/connectors/base.rb', line 148
def request_caller
@options[:client_host] || ::Protobuf.client_host
end
|
#request_fields ⇒ Object
152
153
154
155
156
157
|
# File 'lib/protobuf/rpc/connectors/base.rb', line 152
def request_fields
{ :service_name => @options[:service].name,
:method_name => @options[:method].to_s,
:request_proto => @options[:request],
:caller => request_caller }
end
|
#send_request ⇒ Object
159
160
161
|
# File 'lib/protobuf/rpc/connectors/base.rb', line 159
def send_request
fail 'If you inherit a Connector from Base you must implement send_request'
end
|
#setup_connection ⇒ Object
163
164
165
166
167
|
# File 'lib/protobuf/rpc/connectors/base.rb', line 163
def setup_connection
initialize_stats
@request_data = request_bytes
@stats.request_size = @request_data.size
end
|
#succeed(response) ⇒ Object
169
170
171
172
173
174
175
176
177
178
|
# File 'lib/protobuf/rpc/connectors/base.rb', line 169
def succeed(response)
logger.debug { sign_message("Server succeeded request (invoking on_success)") }
@success_cb.call(response) unless @success_cb.nil?
rescue => e
logger.error { sign_message("Success callback error encountered") }
log_exception(e)
failure(:RPC_ERROR, "An exception occurred while calling on_success: #{e.message}")
ensure
complete
end
|
#timeout ⇒ Object
180
181
182
183
184
185
186
|
# File 'lib/protobuf/rpc/connectors/base.rb', line 180
def timeout
if options[:timeout]
options[:timeout]
else
300 end
end
|
#timeout_wrap(&block) ⇒ Object
Wrap the given block in a timeout of the configured number of seconds.
190
191
192
193
194
|
# File 'lib/protobuf/rpc/connectors/base.rb', line 190
def timeout_wrap(&block)
::Timeout.timeout(timeout, &block)
rescue ::Timeout::Error
failure(:RPC_FAILED, "The server took longer than #{timeout} seconds to respond")
end
|
#validate_request_type! ⇒ Object
196
197
198
199
200
201
202
|
# File 'lib/protobuf/rpc/connectors/base.rb', line 196
def validate_request_type!
unless @options[:request].class == @options[:request_type]
expected = @options[:request_type].name
actual = @options[:request].class.name
failure(:INVALID_REQUEST_PROTO, "Expected request type to be type of #{expected}, got #{actual} instead")
end
end
|
#verify_callbacks ⇒ Object
204
205
206
207
208
209
|
# File 'lib/protobuf/rpc/connectors/base.rb', line 204
def verify_callbacks
unless any_callbacks?
logger.debug { sign_message("No callbacks set, using data_callback") }
@success_cb = @failure_cb = method(:data_callback)
end
end
|
#verify_options! ⇒ Object
211
212
213
214
215
216
|
# File 'lib/protobuf/rpc/connectors/base.rb', line 211
def verify_options!
[:service, :method, :host, :port].each do |opt|
failure(:RPC_ERROR, "Invalid client connection configuration. #{opt} must be a defined option.") if @options[opt].nil?
end
end
|