Class: RServiceBus2::MonitorBeanstalk
- Defined in:
- lib/rservicebus2/monitor/beanstalk.rb
Overview
Monitor S3 Bucket for objects
Instance Attribute Summary
Attributes inherited from Monitor
Instance Method Summary collapse
-
#connect(uri) ⇒ Object
rubocop:disable Metrics/MethodLength,Metrics/AbcSize.
- #deduce_timeout(uri) ⇒ Object
-
#look ⇒ Object
rubocop:enable Metrics/MethodLength,Metrics/AbcSize.
Methods inherited from Monitor
#_connect, #finished, #initialize, #reconnect, #send
Constructor Details
This class inherits a constructor from RServiceBus2::Monitor
Instance Method Details
#connect(uri) ⇒ Object
rubocop:disable Metrics/MethodLength,Metrics/AbcSize
16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 |
# File 'lib/rservicebus2/monitor/beanstalk.rb', line 16 def connect(uri) @uri = uri @timeout = deduce_timeout(uri) queue_name = uri.path.sub('/', '') port ||= 11_300 connection_string = "#{uri.host}:#{port}" @beanstalk = Beanstalk::Pool.new([connection_string]) @beanstalk.watch(queue_name) @message_uri = "beanstalk://#{uri.host}:#{port}/#{queue_name}" rescue StandardError => e puts "Error connecting to Beanstalk, Host string, #{connection_string}" if e. == 'Beanstalk::NotConnected' puts '***Most likely, beanstalk is not running. Start beanstalk, and try running this again.\n' \ "***If you still get this error, check beanstalk is running at, #{connection_string}" abort end puts e. puts e.backtrace end |
#deduce_timeout(uri) ⇒ Object
9 10 11 12 13 |
# File 'lib/rservicebus2/monitor/beanstalk.rb', line 9 def deduce_timeout(uri) return 5 if uri.query.nil? CGI.parse(u.query)['timeout1']&.first || 5 end |
#look ⇒ Object
rubocop:enable Metrics/MethodLength,Metrics/AbcSize
41 42 43 44 45 46 47 48 49 50 51 |
# File 'lib/rservicebus2/monitor/beanstalk.rb', line 41 def look job = @beanstalk.reserve @timeout send(job.body, "#{@message_uri}/#{job.id}") job_body = job.body job.delete job_body rescue StandardError => e return if e. == 'TIMED_OUT' raise e end |