Class: Gearman::Client
- Inherits:
-
Object
- Object
- Gearman::Client
- Defined in:
- lib/gearman/client.rb
Overview
Client
Description
A client for communicating with Gearman job servers.
Instance Attribute Summary collapse
-
#bad_servers ⇒ Object
readonly
Returns the value of attribute bad_servers.
-
#job_servers ⇒ Object
Returns the value of attribute job_servers.
-
#task_create_timeout_sec ⇒ Object
Returns the value of attribute task_create_timeout_sec.
Instance Method Summary collapse
- #close_socket(sock) ⇒ Object
-
#do_task(*args) ⇒ Object
Perform a single task.
-
#get_hostport_for_socket(sock) ⇒ Object
Given a socket from Client#get_socket, return its host and port.
-
#get_job_server ⇒ "host:port"
Get connection info about an arbitrary (currently random, but maybe we’ll do something smarter later) job server.
-
#get_socket(hostport, num_retries = 3) ⇒ Object
Get a socket for a job server.
-
#initialize(job_servers = nil) ⇒ Client
constructor
Create a new client.
-
#option_request(opts) ⇒ Object
Set the options.
-
#return_socket(sock) ⇒ Object
Relinquish a socket created by Client#get_socket.
- #signal_bad_server(hostport) ⇒ Object
Constructor Details
#initialize(job_servers = nil) ⇒ Client
Create a new client.
14 15 16 17 18 19 20 21 22 |
# File 'lib/gearman/client.rb', line 14 def initialize(job_servers=nil) @job_servers = [] # "host:port" self.job_servers = job_servers if job_servers @sockets = {} # "host:port" -> [sock1, sock2, ...] @socket_to_hostport = {} # sock -> "host:port" @task_create_timeout_sec = 10 @server_counter = -1 @bad_servers = [] end |
Instance Attribute Details
#bad_servers ⇒ Object (readonly)
Returns the value of attribute bad_servers.
23 24 25 |
# File 'lib/gearman/client.rb', line 23 def bad_servers @bad_servers end |
#job_servers ⇒ Object
Returns the value of attribute job_servers.
23 24 25 |
# File 'lib/gearman/client.rb', line 23 def job_servers @job_servers end |
#task_create_timeout_sec ⇒ Object
Returns the value of attribute task_create_timeout_sec.
24 25 26 |
# File 'lib/gearman/client.rb', line 24 def task_create_timeout_sec @task_create_timeout_sec end |
Instance Method Details
#close_socket(sock) ⇒ Object
129 130 131 132 133 |
# File 'lib/gearman/client.rb', line 129 def close_socket(sock) sock.close @socket_to_hostport.delete(sock) nil end |
#do_task(*args) ⇒ Object
Perform a single task.
149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 |
# File 'lib/gearman/client.rb', line 149 def do_task(*args) task = Util::get_task_from_args(*args) result = nil failed = false task.on_complete {|v| result = v } task.on_fail { failed = true } taskset = TaskSet.new(self) if taskset.add_task(task) taskset.wait else raise JobQueueError, "Unable to enqueue job." end failed ? nil : result end |
#get_hostport_for_socket(sock) ⇒ Object
Given a socket from Client#get_socket, return its host and port.
140 141 142 |
# File 'lib/gearman/client.rb', line 140 def get_hostport_for_socket(sock) @socket_to_hostport[sock] end |
#get_job_server ⇒ "host:port"
Get connection info about an arbitrary (currently random, but maybe we’ll do something smarter later) job server.
53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 |
# File 'lib/gearman/client.rb', line 53 def get_job_server if @job_servers.empty? && !@bad_servers.empty? Util.logger.debug "GearmanRuby: No more good job servers, trying bad ones: #{@bad_servers.inspect}." # Try to reconnect to the bad servers @bad_servers.each do |bad_server| Util.logger.debug "GearmanRuby: Trying server: #{bad_server.inspect}" begin request = Util.pack_request("echo_req", "ping") sock = self.get_socket(bad_server) Util.send_request(sock, request) response = Util.read_response(sock, 20) if response[0] == :echo_res @job_servers << bad_server @bad_servers.delete bad_server end rescue NetworkError Util.logger.debug "GearmanRuby: Error trying server: #{bad_server.inspect}" end end end Util.logger.debug "GearmanRuby: job servers: #{@job_servers.inspect}" raise NoJobServersError if @job_servers.empty? @server_counter += 1 @job_servers[@server_counter % @job_servers.size] end |
#get_socket(hostport, num_retries = 3) ⇒ Object
Get a socket for a job server.
90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 |
# File 'lib/gearman/client.rb', line 90 def get_socket(hostport, num_retries=3) # If we already have an open socket to this host, return it. if @sockets[hostport] sock = @sockets[hostport].shift @sockets.delete(hostport) if @sockets[hostport].size == 0 return sock end num_retries.times do |i| begin sock = TCPSocket.new(*hostport.split(':')) rescue Exception # Swallow error so we can retry -> num_retries times else # No error, stash socket mapping and return it @socket_to_hostport[sock] = hostport return sock end end raise NetworkError, "Unable to connect to job server #{hostport}" end |
#option_request(opts) ⇒ Object
Set the options
30 31 32 33 34 35 36 37 |
# File 'lib/gearman/client.rb', line 30 def option_request(opts) Util.logger.debug "GearmanRuby: Send options request with #{opts}" request = Util.pack_request("option_req", opts) sock= self.get_socket(self.get_job_server) Util.send_request(sock, request) response = Util.read_response(sock, 20) raise ProtocolError, response[1] if response[0]==:error end |
#return_socket(sock) ⇒ Object
Relinquish a socket created by Client#get_socket.
If we don’t know about the socket, we just close it.
118 119 120 121 122 123 124 125 126 127 |
# File 'lib/gearman/client.rb', line 118 def return_socket(sock) hostport = get_hostport_for_socket(sock) if not hostport inet, port, host, ip = sock.addr Util.logger.error "GearmanRuby: Got socket for #{ip}:#{port}, which we don't know about -- closing" sock.close return end (@sockets[hostport] ||= []) << sock end |
#signal_bad_server(hostport) ⇒ Object
80 81 82 83 |
# File 'lib/gearman/client.rb', line 80 def signal_bad_server(hostport) @job_servers = @job_servers.reject { |s| s == hostport } @bad_servers << hostport end |