Class: ASIR::Transport::Beanstalk

Inherits:
TcpSocket
  • Object
show all
Defined in:
lib/asir/transport/beanstalk.rb

Overview

!SLIDE Beanstalk Transport

Constant Summary collapse

RESERVE =
"reserve\r\n".freeze
NOT_FOUND =
"NOT_FOUND\r\n".freeze
LINE_TERMINATOR =
"\r\n".freeze

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(*args) ⇒ Beanstalk

Returns a new instance of Beanstalk.



12
13
14
15
16
17
18
19
20
21
22
# File 'lib/asir/transport/beanstalk.rb', line 12

def initialize *args
  @one_way = true
  self.scheme_default ||= 'beanstalk'
  self.host_default   ||= '127.0.0.1'
  self.port_default   ||= 11300
  self.tube_default   ||= 'asir'
  @priority ||= 0
  @delay ||= 0
  @ttr ||= 600
  super
end

Instance Attribute Details

#delayObject

Returns the value of attribute delay.



10
11
12
# File 'lib/asir/transport/beanstalk.rb', line 10

def delay
  @delay
end

#priorityObject

Returns the value of attribute priority.



10
11
12
# File 'lib/asir/transport/beanstalk.rb', line 10

def priority
  @priority
end

#ttrObject

Returns the value of attribute ttr.



10
11
12
# File 'lib/asir/transport/beanstalk.rb', line 10

def ttr
  @ttr
end

#tubeObject

Returns the value of attribute tube.



9
10
11
# File 'lib/asir/transport/beanstalk.rb', line 9

def tube
  @tube
end

#tube_defaultObject

Returns the value of attribute tube_default.



9
10
11
# File 'lib/asir/transport/beanstalk.rb', line 9

def tube_default
  @tube_default
end

Instance Method Details

#_after_connect!(stream) ⇒ Object



197
198
199
200
201
202
203
# File 'lib/asir/transport/beanstalk.rb', line 197

def _after_connect! stream
  if t = tube
    _beanstalk(stream,
               "use #{t}\r\n",
               /\AUSING #{t}\r\n\Z/)
  end
end

#_after_invoke_message(state) ⇒ Object

!SLIDE Sends the encoded Result payload String.



88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
# File 'lib/asir/transport/beanstalk.rb', line 88

def _after_invoke_message state
  #
  # There is a possibility here the following could happen:
  #
  #   _receive_message
  #     channel == #<Channel:1>
  #     channel.stream == #<TCPSocket:1234>
  #   end
  #   ...
  #   ERROR OCCURES:
  #      channel.stream.close
  #      channel.stream = nil
  #   ...
  #   _send_result
  #     channel == #<Channel:1>
  #     channel.stream == #<TCPSocket:5678> # NEW CONNECTION
  #     stream.write "delete #{job_id}"
  #   ...
  #
  # Therefore: _receiver_message passes the original message stream to us.
  stream = state.result_opaque
  job_id = state.message[:beanstalk_job_id] or raise "no beanstalk_job_id"
  _beanstalk(stream,
   "delete #{job_id}\r\n",
   /\ADELETED\r\n\Z/)
  # state.in_stream.close # Force close.
end

#_beanstalk(stream, message, expect, payload = nil) ⇒ Object

Send “something …rn”. Expect /ASOMETHING (d+)…rn“.



182
183
184
185
186
187
188
189
190
191
192
193
# File 'lib/asir/transport/beanstalk.rb', line 182

def _beanstalk stream, message, expect, payload = nil
  _log { [ :_beanstalk, :message, message ] } if @verbose >= 3
  stream.write message
  if payload
    stream.write payload
    stream.write LINE_TERMINATOR
  end
  if match = _read_line_and_expect!(stream, expect) # , /\A(BAD_FORMAT|UNKNOWN_COMMAND)\r\n\Z/)
    _log { [ :_beanstalk, :result, match[0] ] } if @verbose >= 3
  end
  match
end

#_beanstalk_return_data(message, expect, data_size_field = nil, payload = nil) ⇒ Object



167
168
169
170
171
172
173
174
175
176
177
178
# File 'lib/asir/transport/beanstalk.rb', line 167

def _beanstalk_return_data message, expect, data_size_field = nil, payload = nil
  (@server || stream).with_stream! do | stream |
    if match = _beanstalk(stream, message, expect, payload)
      size = match[data_size_field ||= 1].to_i
      data = stream.read(size)
      _read_line_and_expect! stream, /\A\r\n\Z/
      data
    else
      raise "unexpected result"
    end
  end
end

#_beanstalk_stats_yaml!(message, expect = nil) ⇒ Object



157
158
159
160
161
162
163
164
# File 'lib/asir/transport/beanstalk.rb', line 157

def _beanstalk_stats_yaml! message, expect = nil
  expect ||= /\AOK (\d+)\r\n\Z/
  x = _beanstalk_return_data message, expect
  x && ::YAML.load(x)
rescue ASIR::Transport::PayloadIO::UnexpectedResponse => exc
  return :NOT_FOUND if exc.received == NOT_FOUND
  raise exc
end

#_receive_message(state) ⇒ Object

!SLIDE Receives the encoded Message payload String.



62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
# File 'lib/asir/transport/beanstalk.rb', line 62

def _receive_message state
  additional_data = state.additional_data ||= { }
  state.in_stream.with_stream! do | stream |
    begin
      match = with_force_stop! do
        _beanstalk(stream,
         RESERVE,
         /\ARESERVED (\d+) (\d+)\r\n\Z/)
      end
      additional_data[:beanstalk_job_id] = match[1].to_i
      additional_data[:beanstalk_message_size] =
        size = match[2].to_i
      state.message_payload = stream.read(size)
      _read_line_and_expect! stream, /\A\r\n\Z/
      state.result_opaque = stream
    rescue ::Exception => exc
      _log { [ :_receive_message, :exception, exc ] }
      additional_data[:beanstalk_error] = exc
      state.in_stream.close
      raise exc
    end
  end
end

#_receive_result(state) ⇒ Object

!SLIDE Receives the encoded Result payload String.



118
119
120
# File 'lib/asir/transport/beanstalk.rb', line 118

def _receive_result state
  nil
end

#_send_message(state) ⇒ Object

!SLIDE Sends the encoded Message payload String.



39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
# File 'lib/asir/transport/beanstalk.rb', line 39

def _send_message state
  stream.with_stream! do | s |
    message = state.message
    begin
      match =
        _beanstalk(s,
                   "put #{message[:beanstalk_priority] || @priority} #{message[:beanstalk_delay] || @delay} #{message[:beanstalk_ttr] || @ttr} #{state.message_payload.size}\r\n",
                   /\AINSERTED (\d+)\r\n\Z/,
                   state.message_payload)
      job_id = message[:beanstalk_job_id] = match[1].to_i
      _log { "beanstalk_job_id = #{job_id.inspect}" } if @verbose >= 2
    rescue ::Exception => exc
      message[:beanstalk_error] = exc
      close
      raise exc
    end
  end
end

#_server!Object

!SLIDE Beanstalk Server



207
208
209
210
211
212
213
214
215
216
217
218
219
220
# File 'lib/asir/transport/beanstalk.rb', line 207

def _server!
  _log { "_server! #{uri}" } if @verbose >= 1
  @server = connect!(:try_max => nil,
                     :try_sleep => 1,
                     :try_sleep_increment => 0.1,
                     :try_sleep_max => 10) do | stream |
    if t = tube
      _beanstalk(stream,
                 "watch #{t}\r\n",
                 /\AWATCHING (\d+)\r\n\Z/)
    end
  end
  self
end

#_server_accept_connection!(server) ⇒ Object



222
223
224
225
# File 'lib/asir/transport/beanstalk.rb', line 222

def _server_accept_connection! server
  prepare_server! unless @server
  [ @server, @server ]
end

#_server_close_connection!(in_stream, out_stream) ⇒ Object



227
228
229
# File 'lib/asir/transport/beanstalk.rb', line 227

def _server_close_connection! in_stream, out_stream
  # NOTHING
end

#_start_conduit!Object



238
239
240
241
242
243
244
245
246
# File 'lib/asir/transport/beanstalk.rb', line 238

def _start_conduit!
  host = conduit_host
  opt = host ? "-l #{host} " : ""
  cmd = "beanstalkd #{opt}-p #{port} -z #{1 * 1024 * 1024} #{@conduit_options[:beanstalkd_options]}"
  if @conduit_options[:verbose]
    $stderr.puts "  #{cmd}" rescue nil
  end
  exec(cmd)
end

#conduit_statusObject

!SLIDE Beanstalk protocol support



134
135
136
137
138
139
140
141
142
# File 'lib/asir/transport/beanstalk.rb', line 134

def conduit_status
  t0 = Time.now
  x = {
    :stats => stats,
    :stats_tube => tube && stats_tube,
  }
  x[:response_time] = Time.now - t0
  { :beanstalkd => x }
end

#path_defaultObject



33
34
35
# File 'lib/asir/transport/beanstalk.rb', line 33

def path_default
  "/#{tube}"
end

#relative_message_delay!(message, now = nil) ⇒ Object

!SLIDE Sets beanstalk_delay if message.delay was specified.



124
125
126
127
128
129
# File 'lib/asir/transport/beanstalk.rb', line 124

def relative_message_delay! message, now = nil
  if delay = super
    message[:beanstalk_delay] = delay.to_i
  end
  delay
end

#statsObject



144
145
146
# File 'lib/asir/transport/beanstalk.rb', line 144

def stats
  _beanstalk_stats_yaml! "stats\r\n"
end

#stats_job(job_id) ⇒ Object



148
149
150
# File 'lib/asir/transport/beanstalk.rb', line 148

def stats_job job_id
  _beanstalk_stats_yaml! "stats-job #{job_id}\r\n"
end

#stats_tube(tube = nil) ⇒ Object



152
153
154
155
# File 'lib/asir/transport/beanstalk.rb', line 152

def stats_tube tube = nil
  tube ||= self.tube
  _beanstalk_stats_yaml! "stats-tube #{tube}\r\n"
end

#stream_eof?(stream) ⇒ Boolean

Returns:

  • (Boolean)


231
232
233
234
235
236
# File 'lib/asir/transport/beanstalk.rb', line 231

def stream_eof? stream
  # Note: stream.eof? on a beanstalkd connection,
  # will cause blocking read *forever* because
  # beanstalk connections are long lived.
  false
end