Class: Gearman::Connection
- Inherits:
-
Object
- Object
- Gearman::Connection
- Includes:
- Logging
- Defined in:
- lib/gearman/connection.rb
Instance Attribute Summary collapse
-
#hostname ⇒ Object
readonly
Returns the value of attribute hostname.
-
#port ⇒ Object
readonly
Returns the value of attribute port.
-
#socket(num_retries = 3) ⇒ Object
readonly
This connection’s Socket.
-
#state ⇒ Object
readonly
Returns the value of attribute state.
Instance Method Summary collapse
- #close_socket ⇒ Object
-
#initialize(hostname, port) ⇒ Connection
constructor
A new instance of Connection.
-
#is_healthy? ⇒ Boolean
Check server health status by sending an ECHO request Return true / false.
- #raise_exception(message) ⇒ Object
-
#read_response(timeout = nil) ⇒ Object
Read a response packet from a socket.
-
#send_request(req, timeout = nil) ⇒ Object
Send a request packet over a socket that needs a response.
- #send_update(req, timeout = nil) ⇒ Object
-
#timed_recv(sock, len, timeout = nil) ⇒ Object
Read from a socket, giving up if it doesn’t finish quickly enough.
- #to_host_port ⇒ Object
- #to_s ⇒ Object
- #with_safe_socket_op ⇒ Object
Methods included from Logging
Constructor Details
#initialize(hostname, port) ⇒ Connection
Returns a new instance of Connection.
7 8 9 10 11 |
# File 'lib/gearman/connection.rb', line 7 def initialize(hostname, port) @hostname = hostname @port = port @real_socket = nil end |
Instance Attribute Details
#hostname ⇒ Object (readonly)
Returns the value of attribute hostname.
13 14 15 |
# File 'lib/gearman/connection.rb', line 13 def hostname @hostname end |
#port ⇒ Object (readonly)
Returns the value of attribute port.
13 14 15 |
# File 'lib/gearman/connection.rb', line 13 def port @port end |
#socket(num_retries = 3) ⇒ Object (readonly)
Returns This connection’s Socket.
42 43 44 |
# File 'lib/gearman/connection.rb', line 42 def socket @socket end |
#state ⇒ Object (readonly)
Returns the value of attribute state.
13 14 15 |
# File 'lib/gearman/connection.rb', line 13 def state @state end |
Instance Method Details
#close_socket ⇒ Object
60 61 62 63 64 |
# File 'lib/gearman/connection.rb', line 60 def close_socket @real_socket.close if @real_socket @real_socket = nil true end |
#is_healthy? ⇒ Boolean
Check server health status by sending an ECHO request Return true / false
19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 |
# File 'lib/gearman/connection.rb', line 19 def is_healthy? if @real_socket == nil logger.debug "Performing health check for #{self}" begin request = Packet.pack_request("echo_req", "ping") response = send_request(request, 3) logger.debug "Health check response for #{self} is #{response.inspect}" raise ProtocolError unless response[0] == :echo_res and response[1] == "ping" return true rescue NetworkError logger.debug "NetworkError -- unhealthy" return false rescue ProtocolError logger.debug "ProtocolError -- unhealthy" return false end end end |
#raise_exception(message) ⇒ Object
66 67 68 69 |
# File 'lib/gearman/connection.rb', line 66 def raise_exception() close_socket raise NetworkError, end |
#read_response(timeout = nil) ⇒ Object
Read a response packet from a socket.
111 112 113 114 115 116 117 118 119 120 121 |
# File 'lib/gearman/connection.rb', line 111 def read_response(timeout=nil) end_time = Time.now.to_f + timeout if timeout head = timed_recv(socket, 12, timeout) magic, type, len = head.unpack('a4NN') raise ProtocolError, "Invalid magic '#{magic}'" unless magic == "\0RES" buf = len > 0 ? timed_recv(socket, len, timeout ? end_time - Time.now.to_f : nil) : '' type = Packet::COMMANDS[type] raise ProtocolError, "Invalid packet type #{type}" unless type [type, buf] end |
#send_request(req, timeout = nil) ⇒ Object
Send a request packet over a socket that needs a response.
129 130 131 132 |
# File 'lib/gearman/connection.rb', line 129 def send_request(req, timeout = nil) send_update(req, timeout) return read_response(timeout) end |
#send_update(req, timeout = nil) ⇒ Object
134 135 136 137 138 139 |
# File 'lib/gearman/connection.rb', line 134 def send_update(req, timeout = nil) len = with_safe_socket_op{ socket.write(req) } if len != req.size raise_exception("Wrote #{len} instead of #{req.size}") end end |
#timed_recv(sock, len, timeout = nil) ⇒ Object
Read from a socket, giving up if it doesn’t finish quickly enough. NetworkError is thrown if we don’t read all the bytes in time.
79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 |
# File 'lib/gearman/connection.rb', line 79 def timed_recv(sock, len, timeout=nil) data = '' start_time = Time.now.to_f end_time = Time.now.to_f + timeout if timeout while data.size < len and (not timeout or Time.now.to_f < end_time) do IO::select([sock], nil, nil, timeout ? end_time - Time.now.to_f : nil) \ or break begin data += sock.readpartial(len - data.size) rescue close_socket raise NetworkError, "Unable to read data from socket." end end if data.size < len now = Time.now.to_f if now > end_time time_lapse = now - start_time raise SocketTimeoutError, "Took too long to read data: #{time_lapse} sec. to read on a #{timeout} sec. timeout" else raise_exception("Read #{data.size} byte(s) instead of #{len}") end end data end |
#to_host_port ⇒ Object
149 150 151 |
# File 'lib/gearman/connection.rb', line 149 def to_host_port "#{hostname}:#{port}" end |
#to_s ⇒ Object
153 154 155 |
# File 'lib/gearman/connection.rb', line 153 def to_s "#{hostname}:#{port} (connected: #{@real_socket != nil})" end |
#with_safe_socket_op ⇒ Object
141 142 143 144 145 146 147 |
# File 'lib/gearman/connection.rb', line 141 def with_safe_socket_op begin yield rescue Exception => ex raise_exception(ex.) end end |