Class: Avro::IPC::Responder

Inherits:
Object
  • Object
show all
Defined in:
lib/avro/ipc.rb

Overview

Base class for the server side of a protocol interaction.

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(local_protocol) ⇒ Responder

Returns a new instance of Responder.



236
237
238
239
240
241
# File 'lib/avro/ipc.rb', line 236

def initialize(local_protocol)
  @local_protocol = local_protocol
  @local_hash = self.local_protocol.md5
  @protocol_cache = {}
  protocol_cache[local_hash] = local_protocol
end

Instance Attribute Details

#local_hashObject (readonly)

Returns the value of attribute local_hash.



235
236
237
# File 'lib/avro/ipc.rb', line 235

def local_hash
  @local_hash
end

#local_protocolObject (readonly)

Returns the value of attribute local_protocol.



235
236
237
# File 'lib/avro/ipc.rb', line 235

def local_protocol
  @local_protocol
end

#protocol_cacheObject (readonly)

Returns the value of attribute protocol_cache.



235
236
237
# File 'lib/avro/ipc.rb', line 235

def protocol_cache
  @protocol_cache
end

Instance Method Details

#call(local_message, request) ⇒ Object

Raises:

  • (NotImplementedError)


353
354
355
356
# File 'lib/avro/ipc.rb', line 353

def call(local_message, request)
  # Actual work done by server: cf. handler in thrift.
  raise NotImplementedError
end

#process_handshake(decoder, encoder, connection = nil) ⇒ Object



306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
# File 'lib/avro/ipc.rb', line 306

def process_handshake(decoder, encoder, connection=nil)
  if connection && connection.is_connected?
    return connection.protocol
  end
  handshake_request = HANDSHAKE_RESPONDER_READER.read(decoder)
  handshake_response = {}

  # determine the remote protocol
  client_hash = handshake_request['clientHash']
  client_protocol = handshake_request['clientProtocol']
  remote_protocol = protocol_cache[client_hash]

  if !remote_protocol && client_protocol
    remote_protocol = Avro::Protocol.parse(client_protocol)
    protocol_cache[client_hash] = remote_protocol
  end

  # evaluate remote's guess of the local protocol
  server_hash = handshake_request['serverHash']
  if local_hash == server_hash
    if !remote_protocol
      handshake_response['match'] = 'NONE'
    else
      handshake_response['match'] = 'BOTH'
    end
  else
    if !remote_protocol
      handshake_response['match'] = 'NONE'
    else
      handshake_response['match'] = 'CLIENT'
    end
  end

  if handshake_response['match'] != 'BOTH'
    handshake_response['serverProtocol'] = local_protocol.to_s
    handshake_response['serverHash'] = local_hash
  end

  HANDSHAKE_RESPONDER_WRITER.write(handshake_response, encoder)

  if connection && handshake_response['match'] != 'NONE'
    connection.protocol = remote_protocol
  end

  remote_protocol
end

#read_request(writers_schema, readers_schema, decoder) ⇒ Object



358
359
360
361
# File 'lib/avro/ipc.rb', line 358

def read_request(writers_schema, readers_schema, decoder)
  datum_reader = Avro::IO::DatumReader.new(writers_schema, readers_schema)
  datum_reader.read(decoder)
end

#respond(call_request, transport = nil) ⇒ Object

Called by a server to deserialize a request, compute and serialize a response or error. Compare to ‘handle()’ in Thrift.



245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
# File 'lib/avro/ipc.rb', line 245

def respond(call_request, transport=nil)
  buffer_decoder = Avro::IO::BinaryDecoder.new(StringIO.new(call_request))
  buffer_writer = StringIO.new(''.force_encoding('BINARY'))
  buffer_encoder = Avro::IO::BinaryEncoder.new(buffer_writer)
  error = nil
   = {}

  begin
    remote_protocol = process_handshake(buffer_decoder, buffer_encoder, transport)
    # handshake failure
    unless remote_protocol
      return buffer_writer.string
    end

    # read request using remote protocol
     = META_READER.read(buffer_decoder)
    remote_message_name = buffer_decoder.read_string

    # get remote and local request schemas so we can do
    # schema resolution (one fine day)
    remote_message = remote_protocol.messages[remote_message_name]
    unless remote_message
      raise AvroError.new("Unknown remote message: #{remote_message_name}")
    end
    local_message = local_protocol.messages[remote_message_name]
    unless local_message
      raise AvroError.new("Unknown local message: #{remote_message_name}")
    end
    writers_schema = remote_message.request
    readers_schema = local_message.request
    request = read_request(writers_schema, readers_schema, buffer_decoder)
    # perform server logic
    begin
      response = call(local_message, request)
    rescue AvroRemoteError => e
      error = e
    rescue Exception => e
      error = AvroRemoteError.new(e.to_s)
    end

    # write response using local protocol
    META_WRITER.write(, buffer_encoder)
    buffer_encoder.write_boolean(!!error)
    if error.nil?
      writers_schema = local_message.response
      write_response(writers_schema, response, buffer_encoder)
    else
      writers_schema = local_message.errors || SYSTEM_ERROR_SCHEMA
      write_error(writers_schema, error, buffer_encoder)
    end
  rescue Avro::AvroError => e
    error = AvroRemoteException.new(e.to_s)
    # TODO does the stuff written here ever get used?
    buffer_encoder = Avro::IO::BinaryEncoder.new(StringIO.new)
    META_WRITER.write(, buffer_encoder)
    buffer_encoder.write_boolean(true)
    self.write_error(SYSTEM_ERROR_SCHEMA, error, buffer_encoder)
  end
  buffer_writer.string
end

#write_error(writers_schema, error_exception, encoder) ⇒ Object



368
369
370
371
# File 'lib/avro/ipc.rb', line 368

def write_error(writers_schema, error_exception, encoder)
  datum_writer = Avro::IO::DatumWriter.new(writers_schema)
  datum_writer.write(error_exception.to_s, encoder)
end

#write_response(writers_schema, response_datum, encoder) ⇒ Object



363
364
365
366
# File 'lib/avro/ipc.rb', line 363

def write_response(writers_schema, response_datum, encoder)
  datum_writer = Avro::IO::DatumWriter.new(writers_schema)
  datum_writer.write(response_datum, encoder)
end