Class: Gearman::Util
- Inherits:
-
Object
- Object
- Gearman::Util
- Defined in:
- lib/gearman/util.rb
Overview
Util
Description
Static helper methods and data used by other classes.
Constant Summary collapse
- COMMANDS =
Map from Integer representations of commands used in the network protocol to more-convenient symbols.
{ 1 => :can_do, # W->J: FUNC 2 => :cant_do, # W->J: FUNC 3 => :reset_abilities, # W->J: -- 4 => :pre_sleep, # W->J: -- #5 => (unused), # - - 6 => :noop, # J->W: -- 7 => :submit_job, # C->J: FUNC[0]UNIQ[0]ARGS 8 => :job_created, # J->C: HANDLE 9 => :grab_job, # W->J: -- 10 => :no_job, # J->W: -- 11 => :job_assign, # J->W: HANDLE[0]FUNC[0]ARG 12 => :work_status, # W->J/C: HANDLE[0]NUMERATOR[0]DENOMINATOR 13 => :work_complete, # W->J/C: HANDLE[0]RES 14 => :work_fail, # W->J/C: HANDLE 15 => :get_status, # C->J: HANDLE 16 => :echo_req, # ?->J: TEXT 17 => :echo_res, # J->?: TEXT 18 => :submit_job_bg, # C->J: FUNC[0]UNIQ[0]ARGS 19 => :error, # J->?: ERRCODE[0]ERR_TEXT 20 => :status_res, # C->J: HANDLE[0]KNOWN[0]RUNNING[0]NUM[0]DENOM 21 => :submit_job_high, # C->J: FUNC[0]UNIQ[0]ARGS 22 => :set_client_id, # W->J: [RANDOM_STRING_NO_WHITESPACE] 23 => :can_do_timeout, # W->J: FUNC[0]TIMEOUT 24 => :all_yours, # REQ Worker 25 => :work_exception, # W->J: HANDLE[0]ARG 26 => :option_req, # C->J: TEXT 27 => :option_res, # J->C: TEXT 28 => :work_data, # REQ Worker 29 => :work_warning, # W->J/C: HANDLE[0]MSG 30 => :grab_job_uniq, # REQ Worker 31 => :job_assign_uniq, # RES Worker 32 => :submit_job_high_bg, # C->J: FUNC[0]UNIQ[0]ARGS 33 => :submit_job_low, # C->J: FUNC[0]UNIQ[0]ARGS 34 => :submit_job_low_bg, # C->J: FUNC[0]UNIQ[0]ARGS 35 => :submit_job_sched, # REQ Client 36 => :submit_job_epoch # C->J: FUNC[0]UNIQ[0]EPOCH[0]ARGS }
- NUMS =
Map e.g. ‘can_do’ => 1
COMMANDS.invert
- DEFAULT_PORT =
Default job server port.
4730
Class Method Summary collapse
- .ability_name_with_prefix(prefix, name) ⇒ Object (also: ability_name_for_perl)
-
.get_task_from_args(*args) ⇒ Object
Return a Task based on the passed-in arguments.
-
.handle_to_str(hostport, handle) ⇒ "host:port//handle"
Convert job server info and a handle into a string.
- .logger ⇒ Object
- .logger=(logger) ⇒ Object
-
.normalize_job_servers(servers) ⇒ Object
Add default ports to a job server or list of servers.
-
.pack_request(type_name, arg = '') ⇒ Object
Construct a request packet.
-
.read_response(sock, timeout = nil) ⇒ Object
Read a response packet from a socket.
-
.send_request(sock, req) ⇒ Object
Send a request packet over a socket.
-
.str_to_handle(str) ⇒ hostport, handle
Reverse Util.handle_to_str.
-
.timed_recv(sock, len, timeout = nil) ⇒ Object
Read from a socket, giving up if it doesn’t finish quickly enough.
- .with_safe_socket_op ⇒ Object
Class Method Details
.ability_name_with_prefix(prefix, name) ⇒ Object Also known as: ability_name_for_perl
201 202 203 |
# File 'lib/gearman/util.rb', line 201 def Util.ability_name_with_prefix(prefix,name) "#{prefix}\t#{name}" end |
.get_task_from_args(*args) ⇒ Object
Return a Task based on the passed-in arguments.
95 96 97 98 99 100 101 102 103 |
# File 'lib/gearman/util.rb', line 95 def Util.get_task_from_args(*args) if (args[0].class == Task || args[0].class.superclass == Task) return args[0] elsif args.size <= 3 return Task.new(*args) else raise InvalidArgsError, 'Incorrect number of args to get_task_from_args' end end |
.handle_to_str(hostport, handle) ⇒ "host:port//handle"
Convert job server info and a handle into a string.
179 180 181 |
# File 'lib/gearman/util.rb', line 179 def Util.handle_to_str(hostport, handle) "#{hostport}//#{handle}" end |
.logger ⇒ Object
67 68 69 70 71 72 73 74 |
# File 'lib/gearman/util.rb', line 67 def Util.logger @logger ||= begin l = Logger.new($stdout) l.level = Logger::FATAL l end end |
.logger=(logger) ⇒ Object
63 64 65 |
# File 'lib/gearman/util.rb', line 63 def Util.logger=(logger) @logger = logger end |
.normalize_job_servers(servers) ⇒ Object
Add default ports to a job server or list of servers.
166 167 168 169 170 171 |
# File 'lib/gearman/util.rb', line 166 def Util.normalize_job_servers(servers) if servers.class == String or servers.class == Symbol servers = [ servers.to_s ] end servers.map {|s| s =~ /:/ ? s : "#{s}:#{DEFAULT_PORT}" } end |
.pack_request(type_name, arg = '') ⇒ Object
Construct a request packet.
82 83 84 85 86 87 |
# File 'lib/gearman/util.rb', line 82 def Util.pack_request(type_name, arg='') type_num = NUMS[type_name.to_sym] raise InvalidArgsError, "Invalid type name '#{type_name}'" unless type_num arg = '' if not arg "\0REQ" + [type_num, arg.size].pack('NN') + arg end |
.read_response(sock, timeout = nil) ⇒ Object
Read a response packet from a socket.
137 138 139 140 141 142 143 144 145 146 147 |
# File 'lib/gearman/util.rb', line 137 def Util.read_response(sock, timeout=nil) end_time = Time.now.to_f + timeout if timeout head = timed_recv(sock, 12, timeout) magic, type, len = head.unpack('a4NN') raise ProtocolError, "Invalid magic '#{magic}'" unless magic == "\0RES" buf = len > 0 ? timed_recv(sock, len, timeout ? end_time - Time.now.to_f : nil) : '' type = COMMANDS[type] raise ProtocolError, "Invalid packet type #{type}" unless type [type, buf] end |
.send_request(sock, req) ⇒ Object
Send a request packet over a socket.
154 155 156 157 158 159 |
# File 'lib/gearman/util.rb', line 154 def Util.send_request(sock, req) len = with_safe_socket_op{ sock.write(req) } if len != req.size raise NetworkError, "Wrote #{len} instead of #{req.size}" end end |
.str_to_handle(str) ⇒ hostport, handle
Reverse Util.handle_to_str.
188 189 190 191 |
# File 'lib/gearman/util.rb', line 188 def Util.str_to_handle(str) str =~ %r{^([^:]+:\d+)//(.+)} return [$1, $2] end |
.timed_recv(sock, len, timeout = nil) ⇒ Object
Read from a socket, giving up if it doesn’t finish quickly enough. NetworkError is thrown if we don’t read all the bytes in time.
113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 |
# File 'lib/gearman/util.rb', line 113 def Util.timed_recv(sock, len, timeout=nil) data = '' end_time = Time.now.to_f + timeout if timeout while data.size < len and (not timeout or Time.now.to_f < end_time) do IO::select([sock], nil, nil, timeout ? end_time - Time.now.to_f : nil) \ or break begin data += sock.readpartial(len - data.size) rescue raise NetworkError, "Unable to read data from socket." end end if data.size < len raise NetworkError, "Read #{data.size} byte(s) instead of #{len}" end data end |
.with_safe_socket_op ⇒ Object
193 194 195 196 197 198 199 |
# File 'lib/gearman/util.rb', line 193 def self.with_safe_socket_op begin yield rescue Exception => ex raise ServerDownException.new(ex.) end end |