Class: Avro::IPC::SocketTransport

Inherits:
Object
  • Object
show all
Defined in:
lib/avro/ipc.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(sock) ⇒ SocketTransport

Returns a new instance of SocketTransport.



380
381
382
383
# File 'lib/avro/ipc.rb', line 380

def initialize(sock)
  @sock = sock
  @protocol = nil
end

Instance Attribute Details

#protocolObject

Returns the value of attribute protocol.



378
379
380
# File 'lib/avro/ipc.rb', line 378

def protocol
  @protocol
end

#remote_nameObject (readonly)

A simple socket-based Transport implementation.



377
378
379
# File 'lib/avro/ipc.rb', line 377

def remote_name
  @remote_name
end

#sockObject (readonly)

A simple socket-based Transport implementation.



377
378
379
# File 'lib/avro/ipc.rb', line 377

def sock
  @sock
end

Instance Method Details

#closeObject



457
458
459
# File 'lib/avro/ipc.rb', line 457

def close
  sock.close
end

#is_connected?Boolean

Returns:

  • (Boolean)


385
386
387
# File 'lib/avro/ipc.rb', line 385

def is_connected?()
  !!@protocol
end

#read_buffer_lengthObject



449
450
451
452
453
454
455
# File 'lib/avro/ipc.rb', line 449

def read_buffer_length
  read = sock.read(BUFFER_HEADER_LENGTH)
  if read == '' || read == nil
    raise ConnectionClosedException.new("Socket read 0 bytes.")
  end
  read.unpack('N')[0]
end

#read_framed_messageObject



394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
# File 'lib/avro/ipc.rb', line 394

def read_framed_message
  message = []
  loop do
    buffer = StringIO.new(''.force_encoding('BINARY'))
    buffer_length = read_buffer_length
    if buffer_length == 0
      return message.join
    end
    while buffer.tell < buffer_length
      chunk = sock.read(buffer_length - buffer.tell)
      if chunk == ''
        raise ConnectionClosedException.new("Socket read 0 bytes.")
      end
      buffer.write(chunk)
    end
    message << buffer.string
  end
end

#transceive(request) ⇒ Object



389
390
391
392
# File 'lib/avro/ipc.rb', line 389

def transceive(request)
  write_framed_message(request)
  read_framed_message
end

#write_buffer(chunk) ⇒ Object



429
430
431
432
433
434
435
436
437
438
439
440
# File 'lib/avro/ipc.rb', line 429

def write_buffer(chunk)
  buffer_length = chunk.bytesize
  write_buffer_length(buffer_length)
  total_bytes_sent = 0
  while total_bytes_sent < buffer_length
    bytes_sent = self.sock.write(chunk[total_bytes_sent..-1])
    if bytes_sent == 0
      raise ConnectionClosedException.new("Socket sent 0 bytes.")
    end
    total_bytes_sent += bytes_sent
  end
end

#write_buffer_length(n) ⇒ Object



442
443
444
445
446
447
# File 'lib/avro/ipc.rb', line 442

def write_buffer_length(n)
  bytes_sent = sock.write([n].pack('N'))
  if bytes_sent == 0
    raise ConnectionClosedException.new("socket sent 0 bytes")
  end
end

#write_framed_message(message) ⇒ Object



413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
# File 'lib/avro/ipc.rb', line 413

def write_framed_message(message)
  message_length = message.bytesize
  total_bytes_sent = 0
  while message_length - total_bytes_sent > 0
    if message_length - total_bytes_sent > BUFFER_SIZE
      buffer_length = BUFFER_SIZE
    else
      buffer_length = message_length - total_bytes_sent
    end
    write_buffer(message[total_bytes_sent,buffer_length])
    total_bytes_sent += buffer_length
  end
  # A message is always terminated by a zero-length buffer.
  write_buffer_length(0)
end