Class: ASIR::Transport::Beanstalk

Inherits:
TcpSocket show all
Defined in:
lib/asir/transport/beanstalk.rb

Overview

!SLIDE Beanstalk Transport

Constant Summary collapse

LINE_TERMINATOR =
"\r\n".freeze
RESERVE =
"reserve\r\n".freeze

Constants included from PayloadIO

PayloadIO::FOOTER, PayloadIO::HEADER

Constants included from UriConfig

UriConfig::S_LOCALHOST, UriConfig::S_TCP

Constants included from ASIR::ThreadVariable

ASIR::ThreadVariable::DEBUG, ASIR::ThreadVariable::EMPTY_HASH, ASIR::ThreadVariable::SETTER

Instance Attribute Summary collapse

Attributes included from UriConfig

#host, #host_default, #path, #path_default, #port, #port_default, #scheme, #scheme_default, #uri

Attributes inherited from ASIR::Transport

#after_receive_message, #before_send_message, #decoder, #encoder, #invoker, #message_count, #needs_message_identifier, #needs_message_timestamp, #on_exception, #on_result_exception, #one_way, #running, #verbose

Attributes included from Log

#_logger

Instance Method Summary collapse

Methods inherited from TcpSocket

#_client_connect!

Methods inherited from ConnectionOriented

#_before_close!, #_connect!, #_server_close!, #connect!, #prepare_server!, #run_server!, #serve_connection!, #server_on_start!, #server_on_stop!, #stream

Methods included from PayloadIO

#_read, #_read_line_and_expect!, #_write, #close

Methods included from UriConfig

#_uri, #address, #protocol

Methods inherited from Stream

#_serve_stream!, #serve_stream!, #serve_stream_message!

Methods inherited from ASIR::Transport

#_subclass_responsibility, #invoke_message!, #needs_message_identifier?, #needs_message_timestamp?, #receive_message, #receive_result, #send_message, #send_result, #serve_message!, #stop!, #with_server_signals!

Methods included from Log

#_log, #_log_enabled=, #_log_enabled?, #_log_format, #_log_result, enabled, enabled=, included

Methods included from AdditionalData

#[], #[]=, #additional_data, #additional_data!, #additional_data=, included

Methods included from Message::Delay

#wait_for_delay!

Methods included from ASIR::ThreadVariable

included, setter

Constructor Details

#initialize(*args) ⇒ Beanstalk

Returns a new instance of Beanstalk.



12
13
14
15
16
17
18
19
# File 'lib/asir/transport/beanstalk.rb', line 12

def initialize *args
  @port ||= 11300
  @tube ||= 'asir'
  @priority ||= 0
  @delay ||= 0
  @ttr ||= 600
  super
end

Instance Attribute Details

#delayObject

Returns the value of attribute delay.



10
11
12
# File 'lib/asir/transport/beanstalk.rb', line 10

def delay
  @delay
end

#priorityObject

Returns the value of attribute priority.



10
11
12
# File 'lib/asir/transport/beanstalk.rb', line 10

def priority
  @priority
end

#ttrObject

Returns the value of attribute ttr.



10
11
12
# File 'lib/asir/transport/beanstalk.rb', line 10

def ttr
  @ttr
end

#tubeObject

Returns the value of attribute tube.



10
11
12
# File 'lib/asir/transport/beanstalk.rb', line 10

def tube
  @tube
end

Instance Method Details

#_after_connect!(stream) ⇒ Object



134
135
136
137
138
139
140
# File 'lib/asir/transport/beanstalk.rb', line 134

def _after_connect! stream
  if @tube
    _beanstalk(stream,
               "use #{@tube}\r\n",
               /\AUSING #{@tube}\r\n\Z/)
  end
end

#_beanstalk(stream, message, expect, payload = nil) ⇒ Object

Send “something …rn”. Expect /ASOMETHING (d+)…rn“.



120
121
122
123
124
125
126
127
128
129
130
131
132
# File 'lib/asir/transport/beanstalk.rb', line 120

def _beanstalk stream, message, expect, payload = nil
  _log { [ :_beanstalk, :message, message ] } if @verbose >= 3
  stream.write message
  if payload
    stream.write payload
    stream.write LINE_TERMINATOR
  end
  stream.flush
  if match = _read_line_and_expect!(stream, expect)
    _log { [ :_beanstalk, :result, match[0] ] } if @verbose >= 3
  end
  match
end

#_receive_message(channel, additional_data) ⇒ Object

!SLIDE Receives the encoded Message payload String.



45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
# File 'lib/asir/transport/beanstalk.rb', line 45

def _receive_message channel, additional_data
  channel.with_stream! do | stream |
    begin
      match = 
        _beanstalk(stream,
                   RESERVE,
                   /\ARESERVED (\d+) (\d+)\r\n\Z/)
      additional_data[:beanstalk_job_id] = match[1].to_i
      additional_data[:beanstalk_message_size] = 
        size = match[2].to_i
      message_payload = stream.read(size)
      _read_line_and_expect! stream, /\A\r\n\Z/
      # Pass the original stream used to #_send_result below.
      [ message_payload, stream ]
    rescue ::Exception => exc
      _log { [ :_receive_message, :exception, exc ] }
      additional_data[:beanstalk_error] = exc
      channel.close
      raise exc
    end
  end
end

#_receive_result(message, opaque_result) ⇒ Object

!SLIDE Receives the encoded Result payload String.



102
103
104
# File 'lib/asir/transport/beanstalk.rb', line 102

def _receive_result message, opaque_result
  nil
end

#_send_message(message, message_payload) ⇒ Object

!SLIDE Sends the encoded Message payload String.



23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
# File 'lib/asir/transport/beanstalk.rb', line 23

def _send_message message, message_payload
  stream.with_stream! do | s |
    begin
      match = 
        _beanstalk(s, 
                   "put #{message[:beanstalk_priority] || @priority} #{message[:beanstalk_delay] || @delay} #{message[:beanstalk_ttr] || @ttr} #{message_payload.size}\r\n",
                   /\AINSERTED (\d+)\r\n\Z/,
                   message_payload)
      job_id = message[:beanstalk_job_id] = match[1].to_i
      _log { "beanstalk_job_id = #{job_id.inspect}" } if @verbose >= 2
    rescue ::Exception => exc
      message[:beanstalk_error] = exc
      close
      raise exc
    end
  end
end

#_send_result(message, result, result_payload, channel, stream) ⇒ Object

!SLIDE Sends the encoded Result payload String.



70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
# File 'lib/asir/transport/beanstalk.rb', line 70

def _send_result message, result, result_payload, channel, stream
  #
  # There is a possibility here the following could happen:
  #
  #   _receive_message
  #     channel == #<Channel:1>   
  #     channel.stream == #<TCPSocket:1234>
  #   end
  #   ...
  #   ERROR OCCURES:
  #      channel.stream.close
  #      channel.stream = nil
  #   ...
  #   _send_result 
  #     channel == #<Channel:1>
  #     channel.stream == #<TCPSocket:5678> # NEW CONNECTION
  #     stream.write "delete #{job_id}"
  #   ...
  #
  # Therefore: _receiver_message passes the original message stream to us.
  # We insure that the same stream is still the active one and use it.
  channel.with_stream! do | maybe_other_stream |
    _log [ :_send_result, "stream lost" ] if maybe_other_stream != stream
    job_id = message[:beanstalk_job_id] or raise "no beanstalk_job_id"
    _beanstalk(stream,
               "delete #{job_id}\r\n",
               /\ADELETED\r\n\Z/)
  end
end

#_server!Object

!SLIDE Beanstalk Server



144
145
146
147
148
149
150
151
152
153
154
155
156
157
# File 'lib/asir/transport/beanstalk.rb', line 144

def _server!
  _log { "_server! #{uri}" } if @verbose >= 1
  @server = connect!(:try_max => nil,
                     :try_sleep => 1,
                     :try_sleep_increment => 0.1,
                     :try_sleep_max => 10) do | stream |
    if @tube
      _beanstalk(stream, 
                 "watch #{@tube}\r\n",
                 /\AWATCHING (\d+)\r\n\Z/)
    end
  end
  self
end

#_server_accept_connection!(server) ⇒ Object



159
160
161
162
# File 'lib/asir/transport/beanstalk.rb', line 159

def _server_accept_connection! server
  prepare_server! unless @server
  [ @server, @server ]
end

#_server_close_connection!(in_stream, out_stream) ⇒ Object



164
165
166
# File 'lib/asir/transport/beanstalk.rb', line 164

def _server_close_connection! in_stream, out_stream
  # NOTHING
end

#_start_conduit!Object



175
176
177
178
179
# File 'lib/asir/transport/beanstalk.rb', line 175

def _start_conduit!
  addr = address ? "-l #{address} " : ""
  cmd = "beanstalkd #{addr}-p #{port}"
  exec(cmd)
end

#relative_message_delay!(message, now = nil) ⇒ Object

!SLIDE Sets beanstalk_delay if message.delay was specified.



108
109
110
111
112
113
# File 'lib/asir/transport/beanstalk.rb', line 108

def relative_message_delay! message, now = nil
  if delay = super
    message[:beanstalk_delay] = delay.to_i
  end
  delay
end

#stream_eof?(stream) ⇒ Boolean

Returns:

  • (Boolean)


168
169
170
171
172
173
# File 'lib/asir/transport/beanstalk.rb', line 168

def stream_eof? stream
  # Note: stream.eof? on a beanstalkd connection,
  # will cause blocking read *forever* because
  # beanstalk connections are long lived.
  false
end