Class: Gearman::Connection

Inherits:
Object
  • Object
show all
Includes:
Logging
Defined in:
lib/gearman/connection.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from Logging

included, #logger

Constructor Details

#initialize(hostname, port) ⇒ Connection



7
8
9
10
11
# File 'lib/gearman/connection.rb', line 7

def initialize(hostname, port)
  @hostname = hostname
  @port = port
  @real_socket = nil
end

Instance Attribute Details

#hostnameObject (readonly)

Returns the value of attribute hostname.



13
14
15
# File 'lib/gearman/connection.rb', line 13

def hostname
  @hostname
end

#portObject (readonly)

Returns the value of attribute port.



13
14
15
# File 'lib/gearman/connection.rb', line 13

def port
  @port
end

#socket(num_retries = 3) ⇒ Object (readonly)



42
43
44
# File 'lib/gearman/connection.rb', line 42

def socket
  @socket
end

#stateObject (readonly)

Returns the value of attribute state.



13
14
15
# File 'lib/gearman/connection.rb', line 13

def state
  @state
end

Instance Method Details

#close_socketObject



60
61
62
63
64
# File 'lib/gearman/connection.rb', line 60

def close_socket
  @real_socket.close if @real_socket
  @real_socket = nil
  true
end

#is_healthy?Boolean

Check server health status by sending an ECHO request Return true / false



19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
# File 'lib/gearman/connection.rb', line 19

def is_healthy?
  if @real_socket == nil
    logger.debug "Performing health check for #{self}"
    begin
      request = Packet.pack_request("echo_req", "ping")
      response = send_request(request, 3)
      logger.debug "Health check response for #{self} is #{response.inspect}"
      raise ProtocolError unless response[0] == :echo_res and response[1] == "ping"
      return true
    rescue NetworkError
      logger.debug "NetworkError -- unhealthy"
      return false
    rescue ProtocolError
      logger.debug "ProtocolError -- unhealthy"
      return false
    end
  end
end

#raise_exception(message) ⇒ Object

Raises:



66
67
68
69
# File 'lib/gearman/connection.rb', line 66

def raise_exception(message)
  close_socket
  raise NetworkError, message
end

#read_response(timeout = nil) ⇒ Object

Read a response packet from a socket.

Raises:



111
112
113
114
115
116
117
118
119
120
121
# File 'lib/gearman/connection.rb', line 111

def read_response(timeout=nil)
  end_time = Time.now.to_f + timeout if timeout
  head = timed_recv(socket, 12, timeout)
  magic, type, len = head.unpack('a4NN')
  raise ProtocolError, "Invalid magic '#{magic}'" unless magic == "\0RES"
  buf = len > 0 ?
      timed_recv(socket, len, timeout ? end_time - Time.now.to_f : nil) : ''
  type = Packet::COMMANDS[type]
  raise ProtocolError, "Invalid packet type #{type}" unless type
  [type, buf]
end

#send_request(req, timeout = nil) ⇒ Object

Send a request packet over a socket that needs a response.



129
130
131
132
# File 'lib/gearman/connection.rb', line 129

def send_request(req, timeout = nil)
  send_update(req, timeout)
  return read_response(timeout)
end

#send_update(req, timeout = nil) ⇒ Object



134
135
136
137
138
139
# File 'lib/gearman/connection.rb', line 134

def send_update(req, timeout = nil)
  len = with_safe_socket_op{ socket.write(req) }
  if len != req.size
    raise_exception("Wrote #{len} instead of #{req.size}")
  end
end

#timed_recv(sock, len, timeout = nil) ⇒ Object

Read from a socket, giving up if it doesn’t finish quickly enough. NetworkError is thrown if we don’t read all the bytes in time.



79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
# File 'lib/gearman/connection.rb', line 79

def timed_recv(sock, len, timeout=nil)
  data = ''
  start_time = Time.now.to_f
  end_time = Time.now.to_f + timeout if timeout
  while data.size < len and (not timeout or Time.now.to_f < end_time) do
    IO::select([sock], nil, nil, timeout ? end_time - Time.now.to_f : nil) \
    or break
    begin
      data += sock.readpartial(len - data.size)
    rescue
      close_socket
      raise NetworkError, "Unable to read data from socket."
    end
  end
  if data.size < len
    now = Time.now.to_f
    if now > end_time
      time_lapse = now - start_time
      raise SocketTimeoutError, "Took too long to read data: #{time_lapse} sec. to read on a  #{timeout} sec. timeout"
    else
      raise_exception("Read #{data.size} byte(s) instead of #{len}")
    end
  end
  data
end

#to_host_portObject



149
150
151
# File 'lib/gearman/connection.rb', line 149

def to_host_port
  "#{hostname}:#{port}"
end

#to_sObject



153
154
155
# File 'lib/gearman/connection.rb', line 153

def to_s
  "#{hostname}:#{port} (connected: #{@real_socket != nil})"
end

#with_safe_socket_opObject



141
142
143
144
145
146
147
# File 'lib/gearman/connection.rb', line 141

def with_safe_socket_op
  begin
    yield
  rescue Exception => ex
    raise_exception(ex.message)
  end
end