Class: ASIR::Transport::Beanstalk
- Inherits:
-
TcpSocket
- Object
- TcpSocket
- ASIR::Transport::Beanstalk
- Defined in:
- lib/asir/transport/beanstalk.rb
Overview
!SLIDE Beanstalk Transport
Constant Summary collapse
- RESERVE =
"reserve\r\n".freeze
- NOT_FOUND =
"NOT_FOUND\r\n".freeze
- LINE_TERMINATOR =
"\r\n".freeze
Instance Attribute Summary collapse
-
#delay ⇒ Object
Returns the value of attribute delay.
-
#priority ⇒ Object
Returns the value of attribute priority.
-
#ttr ⇒ Object
Returns the value of attribute ttr.
-
#tube ⇒ Object
Returns the value of attribute tube.
-
#tube_default ⇒ Object
Returns the value of attribute tube_default.
Instance Method Summary collapse
- #_after_connect!(stream) ⇒ Object
-
#_after_invoke_message(state) ⇒ Object
!SLIDE Sends the encoded Result payload String.
-
#_beanstalk(stream, message, expect, payload = nil) ⇒ Object
Send “something …rn”.
- #_beanstalk_return_data(message, expect, data_size_field = nil, payload = nil) ⇒ Object
- #_beanstalk_stats_yaml!(message, expect = nil) ⇒ Object
-
#_receive_message(state) ⇒ Object
!SLIDE Receives the encoded Message payload String.
-
#_receive_result(state) ⇒ Object
!SLIDE Receives the encoded Result payload String.
-
#_send_message(state) ⇒ Object
!SLIDE Sends the encoded Message payload String.
-
#_server! ⇒ Object
!SLIDE Beanstalk Server.
- #_server_accept_connection!(server) ⇒ Object
- #_server_close_connection!(in_stream, out_stream) ⇒ Object
- #_start_conduit! ⇒ Object
-
#conduit_status ⇒ Object
!SLIDE Beanstalk protocol support.
-
#initialize(*args) ⇒ Beanstalk
constructor
A new instance of Beanstalk.
- #path_default ⇒ Object
-
#relative_message_delay!(message, now = nil) ⇒ Object
!SLIDE Sets beanstalk_delay if message.delay was specified.
- #stats ⇒ Object
- #stats_job(job_id) ⇒ Object
- #stats_tube(tube = nil) ⇒ Object
- #stream_eof?(stream) ⇒ Boolean
Constructor Details
#initialize(*args) ⇒ Beanstalk
Returns a new instance of Beanstalk.
12 13 14 15 16 17 18 19 20 21 22 |
# File 'lib/asir/transport/beanstalk.rb', line 12 def initialize *args @one_way = true self.scheme_default ||= 'beanstalk' self.host_default ||= '127.0.0.1' self.port_default ||= 11300 self.tube_default ||= 'asir' @priority ||= 0 @delay ||= 0 @ttr ||= 600 super end |
Instance Attribute Details
#delay ⇒ Object
Returns the value of attribute delay.
10 11 12 |
# File 'lib/asir/transport/beanstalk.rb', line 10 def delay @delay end |
#priority ⇒ Object
Returns the value of attribute priority.
10 11 12 |
# File 'lib/asir/transport/beanstalk.rb', line 10 def priority @priority end |
#ttr ⇒ Object
Returns the value of attribute ttr.
10 11 12 |
# File 'lib/asir/transport/beanstalk.rb', line 10 def ttr @ttr end |
#tube ⇒ Object
Returns the value of attribute tube.
9 10 11 |
# File 'lib/asir/transport/beanstalk.rb', line 9 def tube @tube end |
#tube_default ⇒ Object
Returns the value of attribute tube_default.
9 10 11 |
# File 'lib/asir/transport/beanstalk.rb', line 9 def tube_default @tube_default end |
Instance Method Details
#_after_connect!(stream) ⇒ Object
197 198 199 200 201 202 203 |
# File 'lib/asir/transport/beanstalk.rb', line 197 def _after_connect! stream if t = tube _beanstalk(stream, "use #{t}\r\n", /\AUSING #{t}\r\n\Z/) end end |
#_after_invoke_message(state) ⇒ Object
!SLIDE Sends the encoded Result payload String.
88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 |
# File 'lib/asir/transport/beanstalk.rb', line 88 def state # # There is a possibility here the following could happen: # # _receive_message # channel == #<Channel:1> # channel.stream == #<TCPSocket:1234> # end # ... # ERROR OCCURES: # channel.stream.close # channel.stream = nil # ... # _send_result # channel == #<Channel:1> # channel.stream == #<TCPSocket:5678> # NEW CONNECTION # stream.write "delete #{job_id}" # ... # # Therefore: _receiver_message passes the original message stream to us. stream = state.result_opaque job_id = state.[:beanstalk_job_id] or raise "no beanstalk_job_id" _beanstalk(stream, "delete #{job_id}\r\n", /\ADELETED\r\n\Z/) # state.in_stream.close # Force close. end |
#_beanstalk(stream, message, expect, payload = nil) ⇒ Object
Send “something …rn”. Expect /ASOMETHING (d+)…rn“.
182 183 184 185 186 187 188 189 190 191 192 193 |
# File 'lib/asir/transport/beanstalk.rb', line 182 def _beanstalk stream, , expect, payload = nil _log { [ :_beanstalk, :message, ] } if @verbose >= 3 stream.write if payload stream.write payload stream.write LINE_TERMINATOR end if match = _read_line_and_expect!(stream, expect) # , /\A(BAD_FORMAT|UNKNOWN_COMMAND)\r\n\Z/) _log { [ :_beanstalk, :result, match[0] ] } if @verbose >= 3 end match end |
#_beanstalk_return_data(message, expect, data_size_field = nil, payload = nil) ⇒ Object
167 168 169 170 171 172 173 174 175 176 177 178 |
# File 'lib/asir/transport/beanstalk.rb', line 167 def _beanstalk_return_data , expect, data_size_field = nil, payload = nil (@server || stream).with_stream! do | stream | if match = _beanstalk(stream, , expect, payload) size = match[data_size_field ||= 1].to_i data = stream.read(size) _read_line_and_expect! stream, /\A\r\n\Z/ data else raise "unexpected result" end end end |
#_beanstalk_stats_yaml!(message, expect = nil) ⇒ Object
157 158 159 160 161 162 163 164 |
# File 'lib/asir/transport/beanstalk.rb', line 157 def _beanstalk_stats_yaml! , expect = nil expect ||= /\AOK (\d+)\r\n\Z/ x = _beanstalk_return_data , expect x && ::YAML.load(x) rescue ASIR::Transport::PayloadIO::UnexpectedResponse => exc return :NOT_FOUND if exc.received == NOT_FOUND raise exc end |
#_receive_message(state) ⇒ Object
!SLIDE Receives the encoded Message payload String.
62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 |
# File 'lib/asir/transport/beanstalk.rb', line 62 def state additional_data = state.additional_data ||= { } state.in_stream.with_stream! do | stream | begin match = with_force_stop! do _beanstalk(stream, RESERVE, /\ARESERVED (\d+) (\d+)\r\n\Z/) end additional_data[:beanstalk_job_id] = match[1].to_i additional_data[:beanstalk_message_size] = size = match[2].to_i state. = stream.read(size) _read_line_and_expect! stream, /\A\r\n\Z/ state.result_opaque = stream rescue ::Exception => exc _log { [ :_receive_message, :exception, exc ] } additional_data[:beanstalk_error] = exc state.in_stream.close raise exc end end end |
#_receive_result(state) ⇒ Object
!SLIDE Receives the encoded Result payload String.
118 119 120 |
# File 'lib/asir/transport/beanstalk.rb', line 118 def _receive_result state nil end |
#_send_message(state) ⇒ Object
!SLIDE Sends the encoded Message payload String.
39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 |
# File 'lib/asir/transport/beanstalk.rb', line 39 def state stream.with_stream! do | s | = state. begin match = _beanstalk(s, "put #{[:beanstalk_priority] || @priority} #{[:beanstalk_delay] || @delay} #{[:beanstalk_ttr] || @ttr} #{state..size}\r\n", /\AINSERTED (\d+)\r\n\Z/, state.) job_id = [:beanstalk_job_id] = match[1].to_i _log { "beanstalk_job_id = #{job_id.inspect}" } if @verbose >= 2 rescue ::Exception => exc [:beanstalk_error] = exc close raise exc end end end |
#_server! ⇒ Object
!SLIDE Beanstalk Server
207 208 209 210 211 212 213 214 215 216 217 218 219 220 |
# File 'lib/asir/transport/beanstalk.rb', line 207 def _server! _log { "_server! #{uri}" } if @verbose >= 1 @server = connect!(:try_max => nil, :try_sleep => 1, :try_sleep_increment => 0.1, :try_sleep_max => 10) do | stream | if t = tube _beanstalk(stream, "watch #{t}\r\n", /\AWATCHING (\d+)\r\n\Z/) end end self end |
#_server_accept_connection!(server) ⇒ Object
222 223 224 225 |
# File 'lib/asir/transport/beanstalk.rb', line 222 def _server_accept_connection! server prepare_server! unless @server [ @server, @server ] end |
#_server_close_connection!(in_stream, out_stream) ⇒ Object
227 228 229 |
# File 'lib/asir/transport/beanstalk.rb', line 227 def _server_close_connection! in_stream, out_stream # NOTHING end |
#_start_conduit! ⇒ Object
238 239 240 241 242 243 244 245 246 |
# File 'lib/asir/transport/beanstalk.rb', line 238 def _start_conduit! host = conduit_host opt = host ? "-l #{host} " : "" cmd = "beanstalkd #{opt}-p #{port} -z #{1 * 1024 * 1024} #{@conduit_options[:beanstalkd_options]}" if @conduit_options[:verbose] $stderr.puts " #{cmd}" rescue nil end exec(cmd) end |
#conduit_status ⇒ Object
!SLIDE Beanstalk protocol support
134 135 136 137 138 139 140 141 142 |
# File 'lib/asir/transport/beanstalk.rb', line 134 def conduit_status t0 = Time.now x = { :stats => stats, :stats_tube => tube && stats_tube, } x[:response_time] = Time.now - t0 { :beanstalkd => x } end |
#path_default ⇒ Object
33 34 35 |
# File 'lib/asir/transport/beanstalk.rb', line 33 def path_default "/#{tube}" end |
#relative_message_delay!(message, now = nil) ⇒ Object
!SLIDE Sets beanstalk_delay if message.delay was specified.
124 125 126 127 128 129 |
# File 'lib/asir/transport/beanstalk.rb', line 124 def , now = nil if delay = super [:beanstalk_delay] = delay.to_i end delay end |
#stats ⇒ Object
144 145 146 |
# File 'lib/asir/transport/beanstalk.rb', line 144 def stats _beanstalk_stats_yaml! "stats\r\n" end |
#stats_job(job_id) ⇒ Object
148 149 150 |
# File 'lib/asir/transport/beanstalk.rb', line 148 def stats_job job_id _beanstalk_stats_yaml! "stats-job #{job_id}\r\n" end |
#stats_tube(tube = nil) ⇒ Object
152 153 154 155 |
# File 'lib/asir/transport/beanstalk.rb', line 152 def stats_tube tube = nil tube ||= self.tube _beanstalk_stats_yaml! "stats-tube #{tube}\r\n" end |
#stream_eof?(stream) ⇒ Boolean
231 232 233 234 235 236 |
# File 'lib/asir/transport/beanstalk.rb', line 231 def stream_eof? stream # Note: stream.eof? on a beanstalkd connection, # will cause blocking read *forever* because # beanstalk connections are long lived. false end |