Class: Gearman::Connection

Inherits:
Object
  • Object
show all
Includes:
Logging
Defined in:
lib/gearman/connection.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from Logging

included, #logger

Constructor Details

#initialize(hostname, port) ⇒ Connection

Returns a new instance of Connection.



7
8
9
10
11
# File 'lib/gearman/connection.rb', line 7

def initialize(hostname, port)
  @hostname = hostname
  @port = port
  @real_socket = nil
end

Instance Attribute Details

#hostnameObject (readonly)

Returns the value of attribute hostname.



13
14
15
# File 'lib/gearman/connection.rb', line 13

def hostname
  @hostname
end

#portObject (readonly)

Returns the value of attribute port.



13
14
15
# File 'lib/gearman/connection.rb', line 13

def port
  @port
end

#socket(num_retries = 3) ⇒ Object (readonly)

Returns This connection’s Socket.

Parameters:

  • num_retries (defaults to: 3)

    Number of times to retry

Returns:

  • This connection’s Socket



42
43
44
# File 'lib/gearman/connection.rb', line 42

def socket
  @socket
end

#stateObject (readonly)

Returns the value of attribute state.



13
14
15
# File 'lib/gearman/connection.rb', line 13

def state
  @state
end

Instance Method Details

#close_socketObject



60
61
62
63
64
# File 'lib/gearman/connection.rb', line 60

def close_socket
  @real_socket.close if @real_socket
  @real_socket = nil
  true
end

#is_healthy?Boolean

Check server health status by sending an ECHO request Return true / false

Returns:

  • (Boolean)


19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
# File 'lib/gearman/connection.rb', line 19

def is_healthy?
  if @real_socket == nil
    logger.debug "Performing health check for #{self}"
    begin
      request = Packet.pack_request("echo_req", "ping")
      response = send_request(request, 3)
      logger.debug "Health check response for #{self} is #{response.inspect}"
      raise ProtocolError unless response[0] == :echo_res and response[1] == "ping"
      return true
    rescue NetworkError
      logger.debug "NetworkError -- unhealthy"
      return false
    rescue ProtocolError
      logger.debug "ProtocolError -- unhealthy"
      return false
    end
  end
end

#raise_exception(message) ⇒ Object

Raises:



66
67
68
69
# File 'lib/gearman/connection.rb', line 66

def raise_exception(message)
  close_socket
  raise NetworkError, message
end

#read_response(timeout = nil) ⇒ Object

Read a response packet from a socket.

Parameters:

  • sock

    Socket connected to a job server

  • timeout (defaults to: nil)

    timeout in seconds, nil for no timeout

Returns:

  • array consisting of integer packet type and data

Raises:



111
112
113
114
115
116
117
118
119
120
121
# File 'lib/gearman/connection.rb', line 111

def read_response(timeout=nil)
  end_time = Time.now.to_f + timeout if timeout
  head = timed_recv(socket, 12, timeout)
  magic, type, len = head.unpack('a4NN')
  raise ProtocolError, "Invalid magic '#{magic}'" unless magic == "\0RES"
  buf = len > 0 ?
      timed_recv(socket, len, timeout ? end_time - Time.now.to_f : nil) : ''
  type = Packet::COMMANDS[type]
  raise ProtocolError, "Invalid packet type #{type}" unless type
  [type, buf]
end

#send_request(req, timeout = nil) ⇒ Object

Send a request packet over a socket that needs a response.

Parameters:

  • sock

    Socket connected to a job server

  • req

    request packet to send



129
130
131
132
# File 'lib/gearman/connection.rb', line 129

def send_request(req, timeout = nil)
  send_update(req, timeout)
  return read_response(timeout)
end

#send_update(req, timeout = nil) ⇒ Object



134
135
136
137
138
139
# File 'lib/gearman/connection.rb', line 134

def send_update(req, timeout = nil)
  len = with_safe_socket_op{ socket.write(req) }
  if len != req.size
    raise_exception("Wrote #{len} instead of #{req.size}")
  end
end

#timed_recv(sock, len, timeout = nil) ⇒ Object

Read from a socket, giving up if it doesn’t finish quickly enough. NetworkError is thrown if we don’t read all the bytes in time.

Parameters:

  • sock

    Socket from which we read

  • len

    number of bytes to read

  • timeout (defaults to: nil)

    maximum number of seconds we’ll take; nil for no timeout

Returns:

  • full data that was read



79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
# File 'lib/gearman/connection.rb', line 79

def timed_recv(sock, len, timeout=nil)
  data = ''
  start_time = Time.now.to_f
  end_time = Time.now.to_f + timeout if timeout
  while data.size < len and (not timeout or Time.now.to_f < end_time) do
    IO::select([sock], nil, nil, timeout ? end_time - Time.now.to_f : nil) \
    or break
    begin
      data += sock.readpartial(len - data.size)
    rescue
      close_socket
      raise NetworkError, "Unable to read data from socket."
    end
  end
  if data.size < len
    now = Time.now.to_f
    if now > end_time
      time_lapse = now - start_time
      raise SocketTimeoutError, "Took too long to read data: #{time_lapse} sec. to read on a  #{timeout} sec. timeout"
    else
      raise_exception("Read #{data.size} byte(s) instead of #{len}")
    end
  end
  data
end

#to_host_portObject



149
150
151
# File 'lib/gearman/connection.rb', line 149

def to_host_port
  "#{hostname}:#{port}"
end

#to_sObject



153
154
155
# File 'lib/gearman/connection.rb', line 153

def to_s
  "#{hostname}:#{port} (connected: #{@real_socket != nil})"
end

#with_safe_socket_opObject



141
142
143
144
145
146
147
# File 'lib/gearman/connection.rb', line 141

def with_safe_socket_op
  begin
    yield
  rescue Exception => ex
    raise_exception(ex.message)
  end
end