Class: Protobuf::Rpc::Connectors::Base
- Inherits:
-
Object
- Object
- Protobuf::Rpc::Connectors::Base
show all
- Includes:
- Logging
- Defined in:
- lib/protobuf/rpc/connectors/base.rb
Instance Attribute Summary collapse
Instance Method Summary
collapse
Methods included from Logging
initialize_logger, #log_exception, #logger, #sign_message
Constructor Details
#initialize(options) ⇒ Base
Returns a new instance of Base.
30
31
32
33
|
# File 'lib/protobuf/rpc/connectors/base.rb', line 30
def initialize(options)
@options = DEFAULT_OPTIONS.merge(options)
@stats = ::Protobuf::Rpc::Stat.new(:CLIENT)
end
|
Instance Attribute Details
#complete_cb ⇒ Object
Returns the value of attribute complete_cb.
28
29
30
|
# File 'lib/protobuf/rpc/connectors/base.rb', line 28
def complete_cb
@complete_cb
end
|
#error ⇒ Object
Returns the value of attribute error.
27
28
29
|
# File 'lib/protobuf/rpc/connectors/base.rb', line 27
def error
@error
end
|
#failure_cb ⇒ Object
Returns the value of attribute failure_cb.
28
29
30
|
# File 'lib/protobuf/rpc/connectors/base.rb', line 28
def failure_cb
@failure_cb
end
|
#options ⇒ Object
Returns the value of attribute options.
27
28
29
|
# File 'lib/protobuf/rpc/connectors/base.rb', line 27
def options
@options
end
|
#stats ⇒ Object
Returns the value of attribute stats.
28
29
30
|
# File 'lib/protobuf/rpc/connectors/base.rb', line 28
def stats
@stats
end
|
#success_cb ⇒ Object
Returns the value of attribute success_cb.
28
29
30
|
# File 'lib/protobuf/rpc/connectors/base.rb', line 28
def success_cb
@success_cb
end
|
Instance Method Details
#any_callbacks? ⇒ Boolean
35
36
37
|
# File 'lib/protobuf/rpc/connectors/base.rb', line 35
def any_callbacks?
[@complete_cb, @failure_cb, @success_cb].any?
end
|
#close_connection ⇒ Object
39
40
41
|
# File 'lib/protobuf/rpc/connectors/base.rb', line 39
def close_connection
fail 'If you inherit a Connector from Base you must implement close_connection'
end
|
#complete ⇒ Object
43
44
45
46
47
48
49
50
51
52
|
# File 'lib/protobuf/rpc/connectors/base.rb', line 43
def complete
@stats.stop
logger.info { @stats.to_s }
logger.debug { sign_message('Response proceessing complete') }
@complete_cb.call(self) unless @complete_cb.nil?
rescue => e
logger.error { sign_message('Complete callback error encountered') }
log_exception(e)
raise
end
|
#data_callback(data) ⇒ Object
54
55
56
57
58
|
# File 'lib/protobuf/rpc/connectors/base.rb', line 54
def data_callback(data)
logger.debug { sign_message('Using data_callback') }
@used_data_callback = true
@data = data
end
|
#failure(code, message) ⇒ Object
All failures should be routed through this method.
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
|
# File 'lib/protobuf/rpc/connectors/base.rb', line 64
def failure(code, message)
@error = ClientError.new
@error.code = ::Protobuf::Socketrpc::ErrorReason.fetch(code)
@error.message = message
logger.debug { sign_message("Server failed request (invoking on_failure): #{@error.inspect}") }
@stats.failure(code)
@failure_cb.call(@error) unless @failure_cb.nil?
rescue => e
logger.error { sign_message("Failure callback error encountered") }
log_exception(e)
raise
ensure
complete
end
|
#first_alive_load_balance? ⇒ Boolean
80
81
82
83
|
# File 'lib/protobuf/rpc/connectors/base.rb', line 80
def first_alive_load_balance?
ENV.key?("PB_FIRST_ALIVE_LOAD_BALANCE") ||
options[:first_alive_load_balance]
end
|
#initialize_stats ⇒ Object
85
86
87
88
89
90
91
92
93
|
# File 'lib/protobuf/rpc/connectors/base.rb', line 85
def initialize_stats
@stats = ::Protobuf::Rpc::Stat.new(:CLIENT)
@stats.server = [@options[:port], @options[:host]]
@stats.service = @options[:service].name
@stats.method_name = @options[:method].to_s
rescue => ex
log_exception(ex)
failure(:RPC_ERROR, "Invalid stats configuration. #{ex.message}")
end
|
#log_signature ⇒ Object
95
96
97
|
# File 'lib/protobuf/rpc/connectors/base.rb', line 95
def log_signature
@_log_signature ||= "[client-#{self.class}]"
end
|
#parse_response ⇒ Object
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
|
# File 'lib/protobuf/rpc/connectors/base.rb', line 99
def parse_response
close_connection
logger.debug { sign_message("Parsing response from server (connection closed)") }
@stats.response_size = @response_data.size unless @response_data.nil?
response_wrapper = ::Protobuf::Socketrpc::Response.decode(@response_data)
if response_wrapper.field?(:error_reason)
logger.debug { sign_message("Error response parsed") }
failure(response_wrapper.error_reason, response_wrapper.error)
else
logger.debug { sign_message("Successful response parsed") }
parsed = @options[:response_type].decode(response_wrapper.response_proto.to_s)
if parsed.nil? && !response_wrapper.field?(:error_reason)
failure(:BAD_RESPONSE_PROTO, 'Unable to parse response from server')
else
verify_callbacks
succeed(parsed)
return @data if @used_data_callback
end
end
end
|
#ping_port ⇒ Object
132
133
134
|
# File 'lib/protobuf/rpc/connectors/base.rb', line 132
def ping_port
@ping_port ||= ENV["PB_RPC_PING_PORT"]
end
|
#ping_port_enabled? ⇒ Boolean
136
137
138
|
# File 'lib/protobuf/rpc/connectors/base.rb', line 136
def ping_port_enabled?
ENV.key?("PB_RPC_PING_PORT")
end
|
#request_bytes ⇒ Object
140
141
142
143
144
145
146
147
148
149
150
|
# File 'lib/protobuf/rpc/connectors/base.rb', line 140
def request_bytes
validate_request_type!
fields = { :service_name => @options[:service].name,
:method_name => @options[:method].to_s,
:request_proto => @options[:request],
:caller => request_caller }
return ::Protobuf::Socketrpc::Request.encode(fields)
rescue => e
failure(:INVALID_REQUEST_PROTO, "Could not set request proto: #{e.message}")
end
|
#request_caller ⇒ Object
152
153
154
|
# File 'lib/protobuf/rpc/connectors/base.rb', line 152
def request_caller
@options[:client_host] || ::Protobuf.client_host
end
|
#send_request ⇒ Object
156
157
158
|
# File 'lib/protobuf/rpc/connectors/base.rb', line 156
def send_request
fail 'If you inherit a Connector from Base you must implement send_request'
end
|
#setup_connection ⇒ Object
160
161
162
163
164
|
# File 'lib/protobuf/rpc/connectors/base.rb', line 160
def setup_connection
initialize_stats
@request_data = request_bytes
@stats.request_size = @request_data.size
end
|
#succeed(response) ⇒ Object
166
167
168
169
170
171
172
173
174
175
176
|
# File 'lib/protobuf/rpc/connectors/base.rb', line 166
def succeed(response)
logger.debug { sign_message("Server succeeded request (invoking on_success)") }
@stats.success
@success_cb.call(response) unless @success_cb.nil?
rescue => e
logger.error { sign_message("Success callback error encountered") }
log_exception(e)
failure(:RPC_ERROR, "An exception occurred while calling on_success: #{e.message}")
ensure
complete
end
|
#timeout ⇒ Object
178
179
180
181
182
183
184
|
# File 'lib/protobuf/rpc/connectors/base.rb', line 178
def timeout
if options[:timeout]
options[:timeout]
else
300 end
end
|
#timeout_wrap(&block) ⇒ Object
Wrap the given block in a timeout of the configured number of seconds.
188
189
190
191
192
|
# File 'lib/protobuf/rpc/connectors/base.rb', line 188
def timeout_wrap(&block)
::Timeout.timeout(timeout, &block)
rescue ::Timeout::Error
failure(:RPC_FAILED, "The server took longer than #{timeout} seconds to respond")
end
|
#validate_request_type! ⇒ Object
194
195
196
197
198
199
200
|
# File 'lib/protobuf/rpc/connectors/base.rb', line 194
def validate_request_type!
unless @options[:request].class == @options[:request_type]
expected = @options[:request_type].name
actual = @options[:request].class.name
failure(:INVALID_REQUEST_PROTO, "Expected request type to be type of #{expected}, got #{actual} instead")
end
end
|
#verify_callbacks ⇒ Object
202
203
204
205
206
207
|
# File 'lib/protobuf/rpc/connectors/base.rb', line 202
def verify_callbacks
unless any_callbacks?
logger.debug { sign_message("No callbacks set, using data_callback") }
@success_cb = @failure_cb = method(:data_callback)
end
end
|
#verify_options! ⇒ Object
209
210
211
212
213
214
|
# File 'lib/protobuf/rpc/connectors/base.rb', line 209
def verify_options!
[:service, :method, :host, :port].each do |opt|
failure(:RPC_ERROR, "Invalid client connection configuration. #{opt} must be a defined option.") if @options[opt].nil?
end
end
|