Class: Beaneater::Connection

Inherits:
Object
  • Object
show all
Defined in:
lib/beaneater/connection.rb

Overview

Represents a connection to a beanstalkd instance.

Constant Summary collapse

MAX_RETRIES =

Default number of retries to send a command to a connection

3
DEFAULT_RETRY_INTERVAL =

Default retry interval

1
DEFAULT_PORT =

Default port value for beanstalk connection

11300

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(address) ⇒ Connection

Initializes new connection.

Examples:

Beaneater::Connection.new('127.0.0.1')
Beaneater::Connection.new('127.0.0.1:11300')

ENV['BEANSTALKD_URL'] = '127.0.0.1:11300'
@b = Beaneater.new
@b.connection.host # => '127.0.0.1'
@b.connection.port # => '11300'

Parameters:

  • address (String)

    beanstalkd instance address.



51
52
53
54
55
56
57
58
59
60
# File 'lib/beaneater/connection.rb', line 51

def initialize(address)
  @address = address || _host_from_env || Beaneater.configuration.beanstalkd_url
  @mutex = Mutex.new
  @tube_used = 'default'
  @tubes_watched = ['default']

  establish_connection
rescue
  _raise_not_connected!
end

Instance Attribute Details

#addressString

Returns Beanstalkd server address

Examples:

@conn.address # => "localhost:11300"

Returns:

  • (String)

    returns Beanstalkd server address



28
29
30
# File 'lib/beaneater/connection.rb', line 28

def address
  @address
end

#connectionObject

Returns the value of attribute connection.



28
# File 'lib/beaneater/connection.rb', line 28

attr_reader :address, :host, :port, :connection

#hostString

Returns Beanstalkd server host

Examples:

@conn.host # => "localhost"

Returns:

  • (String)

    returns Beanstalkd server host



28
# File 'lib/beaneater/connection.rb', line 28

attr_reader :address, :host, :port, :connection

#portInteger

Returns Beanstalkd server port

Examples:

@conn.port # => "11300"

Returns:

  • (Integer)

    returns Beanstalkd server port



28
# File 'lib/beaneater/connection.rb', line 28

attr_reader :address, :host, :port, :connection

#tube_usedObject



34
# File 'lib/beaneater/connection.rb', line 34

attr_accessor :tubes_watched, :tube_used

#tubes_watchedObject



34
35
36
# File 'lib/beaneater/connection.rb', line 34

def tubes_watched
  @tubes_watched
end

Instance Method Details

#add_to_watched(tube_name) ⇒ Object



107
108
109
110
# File 'lib/beaneater/connection.rb', line 107

def add_to_watched(tube_name)
  @tubes_watched << tube_name
  @tubes_watched.uniq
end

#closeObject

Close connection with beanstalkd server.

Examples:

@conn.close


90
91
92
93
94
95
# File 'lib/beaneater/connection.rb', line 90

def close
  if @connection
    @connection.close
    @connection = nil
  end
end

#configBeaneater::Configuration (protected)

Returns configuration options for beaneater

Returns:



172
173
174
# File 'lib/beaneater/connection.rb', line 172

def config
  Beaneater.configuration
end

#establish_connectionNet::TCPSocket (protected)

Establish a connection based on beanstalk address.

Examples:

establish_connection('localhost:3005')

Returns:

  • (Net::TCPSocket)

    connection for specified address.

Raises:



125
126
127
128
129
130
131
# File 'lib/beaneater/connection.rb', line 125

def establish_connection
  @address = address.first if address.is_a?(Array)
  match = address.split(':')
  @host, @port = match[0], Integer(match[1] || DEFAULT_PORT)

  @connection = TCPSocket.new @host, @port
end

#parse_response(cmd, res) ⇒ Array<Hash{String => String, Number}> (protected)

Parses the response and returns the useful beanstalk response. Will read the body if one is indicated by the status.

Examples:

parse_response("delete 56", "DELETED 56\nFOO")
 # => { :body => "FOO", :status => "DELETED", :id => 56 }

Parameters:

  • cmd (String)

    Beanstalk command transmitted

  • res (String)

    Telnet command response

Returns:

  • (Array<Hash{String => String, Number}>)

    Beanstalk response with status, id, body

Raises:



144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
# File 'lib/beaneater/connection.rb', line 144

def parse_response(cmd, res)
  status = res.chomp
  body_values = status.split(/\s/)
  status = body_values[0]
  raise UnexpectedResponse.from_status(status, cmd) if UnexpectedResponse::ERROR_STATES.include?(status)
  body = nil
  if ['OK','FOUND', 'RESERVED'].include?(status)
    bytes_size = body_values[-1].to_i
    raw_body = connection.read(bytes_size)
    body = if status == 'OK'
      psych_v4_valid_body = raw_body.gsub(/^(.*?): (.*)$/) { "#{$1}: #{$2.gsub(/[\:\-\~]/, '_')}" }
      YAML.load(psych_v4_valid_body)
    else
      config.job_parser.call(raw_body)
    end
    crlf = connection.read(2) # \r\n
    raise ExpectedCrlfError.new('EXPECTED_CRLF', cmd) if crlf != "\r\n"
  end
  id = body_values[1]
  response = { :status => status }
  response[:id] = id if id
  response[:body] = body if body
  response
end

#remove_from_watched(tube_name) ⇒ Object



112
113
114
# File 'lib/beaneater/connection.rb', line 112

def remove_from_watched(tube_name)
  @tubes_watched.delete(tube_name)
end

#to_sObject Also known as: inspect

Returns string representation of job.

Examples:

@conn.inspect


102
103
104
# File 'lib/beaneater/connection.rb', line 102

def to_s
  "#<Beaneater::Connection host=#{host.inspect} port=#{port.inspect}>"
end

#transmit(command, **options) ⇒ Array<Hash{String => String, Number}>

Send commands to beanstalkd server via connection.

Examples:

@conn = Beaneater::Connection.new
@conn.transmit('bury 123')
@conn.transmit('stats')

Parameters:

  • ] (Hash{String => String, Number})

    options Retained for compatibility

  • command (String)

    Beanstalkd command

Returns:

  • (Array<Hash{String => String, Number}>)

    Beanstalkd command response



72
73
74
75
76
77
78
79
80
81
82
83
# File 'lib/beaneater/connection.rb', line 72

def transmit(command, **options)
  _with_retry(**options.slice(:retry_interval, :init)) do
    @mutex.synchronize do
      _raise_not_connected! unless connection

      command = command.force_encoding('ASCII-8BIT') if command.respond_to?(:force_encoding)
      connection.write(command.to_s + "\r\n")
      res = connection.readline
      parse_response(command, res)
    end
  end
end