Class: Gearman::Client

Inherits:
Object
  • Object
show all
Defined in:
lib/gearman/client.rb

Overview

Client

Description

A client for communicating with Gearman job servers.

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(job_servers = nil) ⇒ Client

Create a new client.

Parameters:

  • job_servers;eitherasingleserveroranarray ("host:port")

    ob_servers “host:port”; either a single server or an array



14
15
16
17
18
19
20
21
22
# File 'lib/gearman/client.rb', line 14

def initialize(job_servers=nil)
  @job_servers = []  # "host:port"
  self.job_servers = job_servers if job_servers
  @sockets = {}  # "host:port" -> [sock1, sock2, ...]
  @socket_to_hostport = {}  # sock -> "host:port"
  @task_create_timeout_sec = 10
  @server_counter = -1
  @bad_servers = []
end

Instance Attribute Details

#bad_serversObject (readonly)

Returns the value of attribute bad_servers.



23
24
25
# File 'lib/gearman/client.rb', line 23

def bad_servers
  @bad_servers
end

#job_serversObject

Returns the value of attribute job_servers.



23
24
25
# File 'lib/gearman/client.rb', line 23

def job_servers
  @job_servers
end

#task_create_timeout_secObject

Returns the value of attribute task_create_timeout_sec.



24
25
26
# File 'lib/gearman/client.rb', line 24

def task_create_timeout_sec
  @task_create_timeout_sec
end

Instance Method Details

#close_socket(sock) ⇒ Object



129
130
131
132
133
# File 'lib/gearman/client.rb', line 129

def close_socket(sock)
  sock.close
  @socket_to_hostport.delete(sock)
  nil
end

#do_task(*args) ⇒ Object

Perform a single task.

Parameters:

  • args

    either a Task or arguments for Task.new

Returns:

  • output of the task, or nil on failure



149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
# File 'lib/gearman/client.rb', line 149

def do_task(*args)
  task = Util::get_task_from_args(*args)

  result = nil
  failed = false
  task.on_complete {|v| result = v }
  task.on_fail { failed = true }

  taskset = TaskSet.new(self)
  if taskset.add_task(task)
    taskset.wait
  else
    raise JobQueueError, "Unable to enqueue job."
  end

  failed ? nil : result
end

#get_hostport_for_socket(sock) ⇒ Object

Given a socket from Client#get_socket, return its host and port.

Parameters:

  • sock

    Socket



140
141
142
# File 'lib/gearman/client.rb', line 140

def get_hostport_for_socket(sock)
  @socket_to_hostport[sock]
end

#get_job_server"host:port"

Get connection info about an arbitrary (currently random, but maybe we’ll do something smarter later) job server.

Returns:

  • ("host:port")

    host:port“

Raises:



53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
# File 'lib/gearman/client.rb', line 53

def get_job_server
  if @job_servers.empty? && !@bad_servers.empty?
    Util.logger.debug "GearmanRuby: No more good job servers, trying bad ones: #{@bad_servers.inspect}."
    # Try to reconnect to the bad servers
    @bad_servers.each do |bad_server|
      Util.logger.debug "GearmanRuby: Trying server: #{bad_server.inspect}"
      begin
        request = Util.pack_request("echo_req", "ping")
        sock = self.get_socket(bad_server)
        Util.send_request(sock, request)
        response = Util.read_response(sock, 20)
        if response[0] == :echo_res
          @job_servers << bad_server
          @bad_servers.delete bad_server
        end
      rescue NetworkError
        Util.logger.debug "GearmanRuby: Error trying server: #{bad_server.inspect}"
      end
    end
  end

  Util.logger.debug "GearmanRuby: job servers: #{@job_servers.inspect}"
  raise NoJobServersError if @job_servers.empty?
  @server_counter += 1
  @job_servers[@server_counter % @job_servers.size]
end

#get_socket(hostport, num_retries = 3) ⇒ Object

Get a socket for a job server.

Parameters:

  • hostport

    job server “host:port”

Returns:

  • a Socket

Raises:



90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
# File 'lib/gearman/client.rb', line 90

def get_socket(hostport, num_retries=3)
  # If we already have an open socket to this host, return it.
  if @sockets[hostport]
    sock = @sockets[hostport].shift
    @sockets.delete(hostport) if @sockets[hostport].size == 0
    return sock
  end

  num_retries.times do |i|
    begin
      sock = TCPSocket.new(*hostport.split(':'))
    rescue Exception
      # Swallow error so we can retry -> num_retries times
    else
      # No error, stash socket mapping and return it
      @socket_to_hostport[sock] = hostport
      return sock
    end
  end
  raise NetworkError, "Unable to connect to job server #{hostport}"
end

#option_request(opts) ⇒ Object

Set the options

Raises:



30
31
32
33
34
35
36
37
# File 'lib/gearman/client.rb', line 30

def option_request(opts)
  Util.logger.debug "GearmanRuby: Send options request with #{opts}"
  request = Util.pack_request("option_req", opts)
  sock= self.get_socket(self.get_job_server)
  Util.send_request(sock, request)
  response = Util.read_response(sock, 20)
  raise ProtocolError, response[1] if response[0]==:error
end

#return_socket(sock) ⇒ Object

Relinquish a socket created by Client#get_socket.

If we don’t know about the socket, we just close it.

Parameters:

  • sock

    Socket



118
119
120
121
122
123
124
125
126
127
# File 'lib/gearman/client.rb', line 118

def return_socket(sock)
  hostport = get_hostport_for_socket(sock)
  if not hostport
    inet, port, host, ip = sock.addr
    Util.logger.error "GearmanRuby: Got socket for #{ip}:#{port}, which we don't know about -- closing"
    sock.close
    return
  end
  (@sockets[hostport] ||= []) << sock
end

#signal_bad_server(hostport) ⇒ Object



80
81
82
83
# File 'lib/gearman/client.rb', line 80

def signal_bad_server(hostport)
  @job_servers = @job_servers.reject { |s| s == hostport }
  @bad_servers << hostport
end