Class: Beaneater::Connection
- Inherits:
-
Object
- Object
- Beaneater::Connection
- Defined in:
- lib/beaneater/connection.rb
Overview
Represents a connection to a beanstalkd instance.
Constant Summary collapse
- MAX_RETRIES =
Default number of retries to send a command to a connection
3
- DEFAULT_RETRY_INTERVAL =
Default retry interval
1
- DEFAULT_PORT =
Default port value for beanstalk connection
11300
Instance Attribute Summary collapse
-
#address ⇒ String
Returns Beanstalkd server address.
-
#connection ⇒ Object
Returns the value of attribute connection.
-
#host ⇒ String
Returns Beanstalkd server host.
-
#port ⇒ Integer
Returns Beanstalkd server port.
- #tube_used ⇒ Object
- #tubes_watched ⇒ Object
Instance Method Summary collapse
- #add_to_watched(tube_name) ⇒ Object
-
#close ⇒ Object
Close connection with beanstalkd server.
-
#config ⇒ Beaneater::Configuration
protected
Returns configuration options for beaneater.
-
#establish_connection ⇒ Net::TCPSocket
protected
Establish a connection based on beanstalk address.
-
#initialize(address) ⇒ Connection
constructor
Initializes new connection.
-
#parse_response(cmd, res) ⇒ Array<Hash{String => String, Number}>
protected
Parses the response and returns the useful beanstalk response.
- #remove_from_watched(tube_name) ⇒ Object
-
#to_s ⇒ Object
(also: #inspect)
Returns string representation of job.
-
#transmit(command, **options) ⇒ Array<Hash{String => String, Number}>
Send commands to beanstalkd server via connection.
Constructor Details
#initialize(address) ⇒ Connection
Initializes new connection.
51 52 53 54 55 56 57 58 59 60 |
# File 'lib/beaneater/connection.rb', line 51 def initialize(address) @address = address || _host_from_env || Beaneater.configuration.beanstalkd_url @mutex = Mutex.new @tube_used = 'default' @tubes_watched = ['default'] establish_connection rescue _raise_not_connected! end |
Instance Attribute Details
#address ⇒ String
Returns Beanstalkd server address
28 29 30 |
# File 'lib/beaneater/connection.rb', line 28 def address @address end |
#connection ⇒ Object
Returns the value of attribute connection.
28 |
# File 'lib/beaneater/connection.rb', line 28 attr_reader :address, :host, :port, :connection |
#host ⇒ String
Returns Beanstalkd server host
28 |
# File 'lib/beaneater/connection.rb', line 28 attr_reader :address, :host, :port, :connection |
#port ⇒ Integer
Returns Beanstalkd server port
28 |
# File 'lib/beaneater/connection.rb', line 28 attr_reader :address, :host, :port, :connection |
#tube_used ⇒ Object
34 |
# File 'lib/beaneater/connection.rb', line 34 attr_accessor :tubes_watched, :tube_used |
#tubes_watched ⇒ Object
34 35 36 |
# File 'lib/beaneater/connection.rb', line 34 def tubes_watched @tubes_watched end |
Instance Method Details
#add_to_watched(tube_name) ⇒ Object
107 108 109 110 |
# File 'lib/beaneater/connection.rb', line 107 def add_to_watched(tube_name) @tubes_watched << tube_name @tubes_watched.uniq end |
#close ⇒ Object
Close connection with beanstalkd server.
90 91 92 93 94 95 |
# File 'lib/beaneater/connection.rb', line 90 def close if @connection @connection.close @connection = nil end end |
#config ⇒ Beaneater::Configuration (protected)
Returns configuration options for beaneater
172 173 174 |
# File 'lib/beaneater/connection.rb', line 172 def config Beaneater.configuration end |
#establish_connection ⇒ Net::TCPSocket (protected)
Establish a connection based on beanstalk address.
125 126 127 128 129 130 131 |
# File 'lib/beaneater/connection.rb', line 125 def establish_connection @address = address.first if address.is_a?(Array) match = address.split(':') @host, @port = match[0], Integer(match[1] || DEFAULT_PORT) @connection = TCPSocket.new @host, @port end |
#parse_response(cmd, res) ⇒ Array<Hash{String => String, Number}> (protected)
Parses the response and returns the useful beanstalk response. Will read the body if one is indicated by the status.
144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 |
# File 'lib/beaneater/connection.rb', line 144 def parse_response(cmd, res) status = res.chomp body_values = status.split(/\s/) status = body_values[0] raise UnexpectedResponse.from_status(status, cmd) if UnexpectedResponse::ERROR_STATES.include?(status) body = nil if ['OK','FOUND', 'RESERVED'].include?(status) bytes_size = body_values[-1].to_i raw_body = connection.read(bytes_size) body = if status == 'OK' psych_v4_valid_body = raw_body.gsub(/^(.*?): (.*)$/) { "#{$1}: #{$2.gsub(/[\:\-\~]/, '_')}" } YAML.load(psych_v4_valid_body) else config.job_parser.call(raw_body) end crlf = connection.read(2) # \r\n raise ExpectedCrlfError.new('EXPECTED_CRLF', cmd) if crlf != "\r\n" end id = body_values[1] response = { :status => status } response[:id] = id if id response[:body] = body if body response end |
#remove_from_watched(tube_name) ⇒ Object
112 113 114 |
# File 'lib/beaneater/connection.rb', line 112 def remove_from_watched(tube_name) @tubes_watched.delete(tube_name) end |
#to_s ⇒ Object Also known as: inspect
Returns string representation of job.
102 103 104 |
# File 'lib/beaneater/connection.rb', line 102 def to_s "#<Beaneater::Connection host=#{host.inspect} port=#{port.inspect}>" end |
#transmit(command, **options) ⇒ Array<Hash{String => String, Number}>
Send commands to beanstalkd server via connection.
72 73 74 75 76 77 78 79 80 81 82 83 |
# File 'lib/beaneater/connection.rb', line 72 def transmit(command, **) _with_retry(**.slice(:retry_interval, :init)) do @mutex.synchronize do _raise_not_connected! unless connection command = command.force_encoding('ASCII-8BIT') if command.respond_to?(:force_encoding) connection.write(command.to_s + "\r\n") res = connection.readline parse_response(command, res) end end end |