Class: RServiceBus2::MonitorBeanstalk

Inherits:
Monitor
  • Object
show all
Defined in:
lib/rservicebus2/monitor/beanstalk.rb

Overview

Monitor S3 Bucket for objects

Instance Attribute Summary

Attributes inherited from Monitor

#bus

Instance Method Summary collapse

Methods inherited from Monitor

#_connect, #finished, #initialize, #reconnect, #send

Constructor Details

This class inherits a constructor from RServiceBus2::Monitor

Instance Method Details

#connect(uri) ⇒ Object

rubocop:disable Metrics/MethodLength,Metrics/AbcSize



16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
# File 'lib/rservicebus2/monitor/beanstalk.rb', line 16

def connect(uri)
  @uri = uri
  @timeout = deduce_timeout(uri)

  queue_name = uri.path.sub('/', '')

  port ||= 11_300
  connection_string = "#{uri.host}:#{port}"
  @beanstalk = Beanstalk::Pool.new([connection_string])

  @beanstalk.watch(queue_name)
  @message_uri = "beanstalk://#{uri.host}:#{port}/#{queue_name}"
rescue StandardError => e
  puts "Error connecting to Beanstalk, Host string, #{connection_string}"
  if e.message == 'Beanstalk::NotConnected'
    puts '***Most likely, beanstalk is not running. Start beanstalk, and try running this again.\n' \
         "***If you still get this error, check beanstalk is running at, #{connection_string}"
    abort
  end

  puts e.message
  puts e.backtrace
end

#deduce_timeout(uri) ⇒ Object



9
10
11
12
13
# File 'lib/rservicebus2/monitor/beanstalk.rb', line 9

def deduce_timeout(uri)
  return 5 if uri.query.nil?

  CGI.parse(u.query)['timeout1']&.first || 5
end

#lookObject

rubocop:enable Metrics/MethodLength,Metrics/AbcSize



41
42
43
44
45
46
47
48
49
50
51
# File 'lib/rservicebus2/monitor/beanstalk.rb', line 41

def look
  job = @beanstalk.reserve @timeout
  send(job.body, "#{@message_uri}/#{job.id}")
  job_body = job.body
  job.delete
  job_body
rescue StandardError => e
  return if e.message == 'TIMED_OUT'

  raise e
end