Class: Avro::IPC::SocketTransport

Inherits:
Object
  • Object
show all
Defined in:
lib/avro/ipc.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(sock) ⇒ SocketTransport

Returns a new instance of SocketTransport.



379
380
381
382
# File 'lib/avro/ipc.rb', line 379

def initialize(sock)
  @sock = sock
  @protocol = nil
end

Instance Attribute Details

#protocolObject

Returns the value of attribute protocol.



377
378
379
# File 'lib/avro/ipc.rb', line 377

def protocol
  @protocol
end

#remote_nameObject (readonly)

A simple socket-based Transport implementation.



376
377
378
# File 'lib/avro/ipc.rb', line 376

def remote_name
  @remote_name
end

#sockObject (readonly)

A simple socket-based Transport implementation.



376
377
378
# File 'lib/avro/ipc.rb', line 376

def sock
  @sock
end

Instance Method Details

#closeObject



456
457
458
# File 'lib/avro/ipc.rb', line 456

def close
  sock.close
end

#is_connected?Boolean

Returns:

  • (Boolean)


384
385
386
# File 'lib/avro/ipc.rb', line 384

def is_connected?()
  !!@protocol
end

#read_buffer_lengthObject



448
449
450
451
452
453
454
# File 'lib/avro/ipc.rb', line 448

def read_buffer_length
  read = sock.read(BUFFER_HEADER_LENGTH)
  if read == '' || read == nil
    raise ConnectionClosedException.new("Socket read 0 bytes.")
  end
  read.unpack('N')[0]
end

#read_framed_messageObject



393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
# File 'lib/avro/ipc.rb', line 393

def read_framed_message
  message = []
  loop do
    buffer = StringIO.new
    buffer_length = read_buffer_length
    if buffer_length == 0
      return message.join
    end
    while buffer.tell < buffer_length
      chunk = sock.read(buffer_length - buffer.tell)
      if chunk == ''
        raise ConnectionClosedException.new("Socket read 0 bytes.")
      end
      buffer.write(chunk)
    end
    message << buffer.string
  end
end

#transceive(request) ⇒ Object



388
389
390
391
# File 'lib/avro/ipc.rb', line 388

def transceive(request)
  write_framed_message(request)
  read_framed_message
end

#write_buffer(chunk) ⇒ Object



428
429
430
431
432
433
434
435
436
437
438
439
# File 'lib/avro/ipc.rb', line 428

def write_buffer(chunk)
  buffer_length = chunk.size
  write_buffer_length(buffer_length)
  total_bytes_sent = 0
  while total_bytes_sent < buffer_length
    bytes_sent = self.sock.write(chunk[total_bytes_sent..-1])
    if bytes_sent == 0
      raise ConnectionClosedException.new("Socket sent 0 bytes.")
    end
    total_bytes_sent += bytes_sent
  end
end

#write_buffer_length(n) ⇒ Object



441
442
443
444
445
446
# File 'lib/avro/ipc.rb', line 441

def write_buffer_length(n)
  bytes_sent = sock.write([n].pack('N'))
  if bytes_sent == 0
    raise ConnectionClosedException.new("socket sent 0 bytes")
  end
end

#write_framed_message(message) ⇒ Object



412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
# File 'lib/avro/ipc.rb', line 412

def write_framed_message(message)
  message_length = message.size
  total_bytes_sent = 0
  while message_length - total_bytes_sent > 0
    if message_length - total_bytes_sent > BUFFER_SIZE
      buffer_length = BUFFER_SIZE
    else
      buffer_length = message_length - total_bytes_sent
    end
    write_buffer(message[total_bytes_sent,buffer_length])
    total_bytes_sent += buffer_length
  end
  # A message is always terminated by a zero-length buffer.
  write_buffer_length(0)
end