Class: Gearman::Connection
- Inherits:
-
Object
- Object
- Gearman::Connection
- Includes:
- Celluloid::IO
- Defined in:
- lib/gearman/connection.rb
Constant Summary collapse
- IncompleteReadError =
Class.new(Gearman::Error)
- IncompleteWriteError =
Class.new(Gearman::Error)
- NoConnectionError =
Class.new(Gearman::Error)
- UnexpectedPacketError =
Class.new(Gearman::Error)
- ServerError =
Class.new(Gearman::Error)
- NULL_BYTE =
"\0"
- REQ =
[NULL_BYTE, "REQ"].join
- HEADER_FORMAT =
"a4NN"
- HEADER_SIZE =
12
Instance Method Summary collapse
- #disconnect ⇒ Object
-
#initialize(address) ⇒ Connection
constructor
A new instance of Connection.
- #next(*expected_packet_types) ⇒ Object
- #write(packet) ⇒ Object
Constructor Details
#initialize(address) ⇒ Connection
Returns a new instance of Connection.
24 25 26 27 28 |
# File 'lib/gearman/connection.rb', line 24 def initialize(address) @address = address @repository = Packet::Repository.new @socket = nil end |
Instance Method Details
#disconnect ⇒ Object
72 73 74 75 76 77 |
# File 'lib/gearman/connection.rb', line 72 def disconnect if @socket @socket.close unless @socket.closed? @socket = nil end end |
#next(*expected_packet_types) ⇒ Object
50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 |
# File 'lib/gearman/connection.rb', line 50 def next(*expected_packet_types) connect if disconnected? header = read(HEADER_SIZE) magic, type, length = header.unpack(HEADER_FORMAT) body = read(length) arguments = String(body).split(NULL_BYTE) @repository.load(type).new(arguments).tap do |packet| debug "Read #{packet.inspect}" if packet.is_a?(Packet::ERROR) = "server sent error #{packet.error_code}: #{packet.text}" raise ServerError, end verify packet, expected_packet_types end end |
#write(packet) ⇒ Object
30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 |
# File 'lib/gearman/connection.rb', line 30 def write(packet) connect if disconnected? body = packet.arguments.join(NULL_BYTE) header = [REQ, packet.number, body.size].pack(HEADER_FORMAT) serialized_packet = header + body length_written = @socket.write(serialized_packet) debug "Wrote #{packet.inspect}" if length_written != serialized_packet.length lengths = [serialized_packet.length, lengths] = "expected to write %d bytes, but only read %d" % lengths raise IncompleteWriteError, end end |