Class: Gearman::Connection

Inherits:
Object
  • Object
show all
Includes:
Celluloid::IO
Defined in:
lib/gearman/connection.rb

Constant Summary collapse

IncompleteReadError =
Class.new(Gearman::Error)
IncompleteWriteError =
Class.new(Gearman::Error)
NoConnectionError =
Class.new(Gearman::Error)
UnexpectedPacketError =
Class.new(Gearman::Error)
ServerError =
Class.new(Gearman::Error)
NULL_BYTE =
"\0"
REQ =
[NULL_BYTE, "REQ"].join
HEADER_FORMAT =
"a4NN"
HEADER_SIZE =
12

Instance Method Summary collapse

Constructor Details

#initialize(address) ⇒ Connection

Returns a new instance of Connection.



24
25
26
27
28
# File 'lib/gearman/connection.rb', line 24

def initialize(address)
  @address = address
  @repository = Packet::Repository.new
  @socket = nil
end

Instance Method Details

#disconnectObject



72
73
74
75
76
77
# File 'lib/gearman/connection.rb', line 72

def disconnect
  if @socket
    @socket.close unless @socket.closed?
    @socket = nil
  end
end

#next(*expected_packet_types) ⇒ Object



50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
# File 'lib/gearman/connection.rb', line 50

def next(*expected_packet_types)
  connect if disconnected?

  header = read(HEADER_SIZE)
  magic, type, length = header.unpack(HEADER_FORMAT)

  body = read(length)
  arguments = String(body).split(NULL_BYTE)

  @repository.load(type).new(arguments).tap do |packet|
    debug "Read #{packet.inspect}"

    if packet.is_a?(Packet::ERROR)
      message = "server sent error #{packet.error_code}: #{packet.text}"

      raise ServerError, message
    end

    verify packet, expected_packet_types
  end
end

#write(packet) ⇒ Object



30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
# File 'lib/gearman/connection.rb', line 30

def write(packet)
  connect if disconnected?

  body = packet.arguments.join(NULL_BYTE)
  header = [REQ, packet.number, body.size].pack(HEADER_FORMAT)

  serialized_packet = header + body

  length_written = @socket.write(serialized_packet)

  debug "Wrote #{packet.inspect}"

  if length_written != serialized_packet.length
    lengths = [serialized_packet.length, lengths]
    message = "expected to write %d bytes, but only read %d" % lengths

    raise IncompleteWriteError, message
  end
end