Class: ASIR::Transport::Beanstalk
- Inherits:
-
TcpSocket
- Object
- ASIR::Transport
- Stream
- ConnectionOriented
- TcpSocket
- ASIR::Transport::Beanstalk
- Defined in:
- lib/asir/transport/beanstalk.rb
Overview
!SLIDE Beanstalk Transport
Constant Summary collapse
- LINE_TERMINATOR =
"\r\n".freeze
- RESERVE =
"reserve\r\n".freeze
Constants included from PayloadIO
PayloadIO::FOOTER, PayloadIO::HEADER
Constants included from UriConfig
UriConfig::S_LOCALHOST, UriConfig::S_TCP
Constants included from ASIR::ThreadVariable
ASIR::ThreadVariable::DEBUG, ASIR::ThreadVariable::EMPTY_HASH, ASIR::ThreadVariable::SETTER
Instance Attribute Summary collapse
-
#delay ⇒ Object
Returns the value of attribute delay.
-
#priority ⇒ Object
Returns the value of attribute priority.
-
#ttr ⇒ Object
Returns the value of attribute ttr.
-
#tube ⇒ Object
Returns the value of attribute tube.
Attributes included from UriConfig
#host, #host_default, #path, #path_default, #port, #port_default, #scheme, #scheme_default, #uri
Attributes inherited from ASIR::Transport
#after_receive_message, #before_send_message, #decoder, #encoder, #invoker, #message_count, #needs_message_identifier, #needs_message_timestamp, #on_exception, #on_result_exception, #one_way, #running, #verbose
Attributes included from Log
Instance Method Summary collapse
- #_after_connect!(stream) ⇒ Object
-
#_beanstalk(stream, message, expect, payload = nil) ⇒ Object
Send “something …rn”.
-
#_receive_message(channel, additional_data) ⇒ Object
!SLIDE Receives the encoded Message payload String.
-
#_receive_result(message, opaque_result) ⇒ Object
!SLIDE Receives the encoded Result payload String.
-
#_send_message(message, message_payload) ⇒ Object
!SLIDE Sends the encoded Message payload String.
-
#_send_result(message, result, result_payload, channel, stream) ⇒ Object
!SLIDE Sends the encoded Result payload String.
-
#_server! ⇒ Object
!SLIDE Beanstalk Server.
- #_server_accept_connection!(server) ⇒ Object
- #_server_close_connection!(in_stream, out_stream) ⇒ Object
- #_start_conduit! ⇒ Object
-
#initialize(*args) ⇒ Beanstalk
constructor
A new instance of Beanstalk.
-
#relative_message_delay!(message, now = nil) ⇒ Object
!SLIDE Sets beanstalk_delay if message.delay was specified.
- #stream_eof?(stream) ⇒ Boolean
Methods inherited from TcpSocket
Methods inherited from ConnectionOriented
#_before_close!, #_connect!, #_server_close!, #connect!, #prepare_server!, #run_server!, #serve_connection!, #server_on_start!, #server_on_stop!, #stream
Methods included from PayloadIO
#_read, #_read_line_and_expect!, #_write, #close
Methods included from UriConfig
Methods inherited from Stream
#_serve_stream!, #serve_stream!, #serve_stream_message!
Methods inherited from ASIR::Transport
#_subclass_responsibility, #invoke_message!, #needs_message_identifier?, #needs_message_timestamp?, #receive_message, #receive_result, #send_message, #send_result, #serve_message!, #stop!, #with_server_signals!
Methods included from Log
#_log, #_log_enabled=, #_log_enabled?, #_log_format, #_log_result, enabled, enabled=, included
Methods included from AdditionalData
#[], #[]=, #additional_data, #additional_data!, #additional_data=, included
Methods included from Message::Delay
Methods included from ASIR::ThreadVariable
Constructor Details
#initialize(*args) ⇒ Beanstalk
Returns a new instance of Beanstalk.
12 13 14 15 16 17 18 19 |
# File 'lib/asir/transport/beanstalk.rb', line 12 def initialize *args @port ||= 11300 @tube ||= 'asir' @priority ||= 0 @delay ||= 0 @ttr ||= 600 super end |
Instance Attribute Details
#delay ⇒ Object
Returns the value of attribute delay.
10 11 12 |
# File 'lib/asir/transport/beanstalk.rb', line 10 def delay @delay end |
#priority ⇒ Object
Returns the value of attribute priority.
10 11 12 |
# File 'lib/asir/transport/beanstalk.rb', line 10 def priority @priority end |
#ttr ⇒ Object
Returns the value of attribute ttr.
10 11 12 |
# File 'lib/asir/transport/beanstalk.rb', line 10 def ttr @ttr end |
#tube ⇒ Object
Returns the value of attribute tube.
10 11 12 |
# File 'lib/asir/transport/beanstalk.rb', line 10 def tube @tube end |
Instance Method Details
#_after_connect!(stream) ⇒ Object
134 135 136 137 138 139 140 |
# File 'lib/asir/transport/beanstalk.rb', line 134 def _after_connect! stream if @tube _beanstalk(stream, "use #{@tube}\r\n", /\AUSING #{@tube}\r\n\Z/) end end |
#_beanstalk(stream, message, expect, payload = nil) ⇒ Object
Send “something …rn”. Expect /ASOMETHING (d+)…rn“.
120 121 122 123 124 125 126 127 128 129 130 131 132 |
# File 'lib/asir/transport/beanstalk.rb', line 120 def _beanstalk stream, , expect, payload = nil _log { [ :_beanstalk, :message, ] } if @verbose >= 3 stream.write if payload stream.write payload stream.write LINE_TERMINATOR end stream.flush if match = _read_line_and_expect!(stream, expect) _log { [ :_beanstalk, :result, match[0] ] } if @verbose >= 3 end match end |
#_receive_message(channel, additional_data) ⇒ Object
!SLIDE Receives the encoded Message payload String.
45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 |
# File 'lib/asir/transport/beanstalk.rb', line 45 def channel, additional_data channel.with_stream! do | stream | begin match = _beanstalk(stream, RESERVE, /\ARESERVED (\d+) (\d+)\r\n\Z/) additional_data[:beanstalk_job_id] = match[1].to_i additional_data[:beanstalk_message_size] = size = match[2].to_i = stream.read(size) _read_line_and_expect! stream, /\A\r\n\Z/ # Pass the original stream used to #_send_result below. [ , stream ] rescue ::Exception => exc _log { [ :_receive_message, :exception, exc ] } additional_data[:beanstalk_error] = exc channel.close raise exc end end end |
#_receive_result(message, opaque_result) ⇒ Object
!SLIDE Receives the encoded Result payload String.
102 103 104 |
# File 'lib/asir/transport/beanstalk.rb', line 102 def _receive_result , opaque_result nil end |
#_send_message(message, message_payload) ⇒ Object
!SLIDE Sends the encoded Message payload String.
23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 |
# File 'lib/asir/transport/beanstalk.rb', line 23 def , stream.with_stream! do | s | begin match = _beanstalk(s, "put #{[:beanstalk_priority] || @priority} #{[:beanstalk_delay] || @delay} #{[:beanstalk_ttr] || @ttr} #{.size}\r\n", /\AINSERTED (\d+)\r\n\Z/, ) job_id = [:beanstalk_job_id] = match[1].to_i _log { "beanstalk_job_id = #{job_id.inspect}" } if @verbose >= 2 rescue ::Exception => exc [:beanstalk_error] = exc close raise exc end end end |
#_send_result(message, result, result_payload, channel, stream) ⇒ Object
!SLIDE Sends the encoded Result payload String.
70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 |
# File 'lib/asir/transport/beanstalk.rb', line 70 def _send_result , result, result_payload, channel, stream # # There is a possibility here the following could happen: # # _receive_message # channel == #<Channel:1> # channel.stream == #<TCPSocket:1234> # end # ... # ERROR OCCURES: # channel.stream.close # channel.stream = nil # ... # _send_result # channel == #<Channel:1> # channel.stream == #<TCPSocket:5678> # NEW CONNECTION # stream.write "delete #{job_id}" # ... # # Therefore: _receiver_message passes the original message stream to us. # We insure that the same stream is still the active one and use it. channel.with_stream! do | maybe_other_stream | _log [ :_send_result, "stream lost" ] if maybe_other_stream != stream job_id = [:beanstalk_job_id] or raise "no beanstalk_job_id" _beanstalk(stream, "delete #{job_id}\r\n", /\ADELETED\r\n\Z/) end end |
#_server! ⇒ Object
!SLIDE Beanstalk Server
144 145 146 147 148 149 150 151 152 153 154 155 156 157 |
# File 'lib/asir/transport/beanstalk.rb', line 144 def _server! _log { "_server! #{uri}" } if @verbose >= 1 @server = connect!(:try_max => nil, :try_sleep => 1, :try_sleep_increment => 0.1, :try_sleep_max => 10) do | stream | if @tube _beanstalk(stream, "watch #{@tube}\r\n", /\AWATCHING (\d+)\r\n\Z/) end end self end |
#_server_accept_connection!(server) ⇒ Object
159 160 161 162 |
# File 'lib/asir/transport/beanstalk.rb', line 159 def _server_accept_connection! server prepare_server! unless @server [ @server, @server ] end |
#_server_close_connection!(in_stream, out_stream) ⇒ Object
164 165 166 |
# File 'lib/asir/transport/beanstalk.rb', line 164 def _server_close_connection! in_stream, out_stream # NOTHING end |
#_start_conduit! ⇒ Object
175 176 177 178 179 |
# File 'lib/asir/transport/beanstalk.rb', line 175 def _start_conduit! addr = address ? "-l #{address} " : "" cmd = "beanstalkd #{addr}-p #{port}" exec(cmd) end |
#relative_message_delay!(message, now = nil) ⇒ Object
!SLIDE Sets beanstalk_delay if message.delay was specified.
108 109 110 111 112 113 |
# File 'lib/asir/transport/beanstalk.rb', line 108 def , now = nil if delay = super [:beanstalk_delay] = delay.to_i end delay end |
#stream_eof?(stream) ⇒ Boolean
168 169 170 171 172 173 |
# File 'lib/asir/transport/beanstalk.rb', line 168 def stream_eof? stream # Note: stream.eof? on a beanstalkd connection, # will cause blocking read *forever* because # beanstalk connections are long lived. false end |