Class: RServiceBus2::MQBeanstalk

Inherits:
MQ
  • Object
show all
Defined in:
lib/rservicebus2/mq/beanstalk.rb

Overview

Beanstalk client implementation.

Instance Attribute Summary

Attributes inherited from MQ

#local_queue_name

Instance Method Summary collapse

Methods inherited from MQ

get, #initialize

Constructor Details

This class inherits a constructor from RServiceBus2::MQ

Instance Method Details

#ackObject



57
58
59
60
# File 'lib/rservicebus2/mq/beanstalk.rb', line 57

def ack
  @job.delete
  @job = nil
end

#connect(host, port) ⇒ Object

Connect to the broker rubocop:disable Metrics/AbcSize,Metrics/MethodLength



9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
# File 'lib/rservicebus2/mq/beanstalk.rb', line 9

def connect(host, port)
  port ||= 11_300
  string = "#{host}:#{port}"

  begin
    @beanstalk = Beanstalk::Pool.new([string])
    @max_job_size = @beanstalk.stats['max-job-size']
    if @max_job_size < 4_194_304
      puts "***WARNING: Lowest recommended.max-job-size is 4m, current
            max-job-size, #{@max_job_size.to_f / (1024 * 1024)}m"
      puts '***WARNING: Set the job size with the -z switch, eg
            /usr/local/bin/beanstalkd -z 4194304'
    end
  rescue StandardError => e
    puts 'Error connecting to Beanstalk'
    puts "Host string, #{string}"
    if e.message == 'Beanstalk::NotConnected'
      puts '***Most likely, beanstalk is not running. Start beanstalk,
              and try running this again.'
      puts "***If you still get this error, check beanstalk is running
              at, #{string}"
    else
      puts e.message
      puts e.backtrace
    end
    abort
  end
end

#popObject

Get next msg from queue



43
44
45
46
47
48
49
50
51
# File 'lib/rservicebus2/mq/beanstalk.rb', line 43

def pop
  begin
    @job = @beanstalk.reserve @timeout
  rescue StandardError => e
    raise NoMsgToProcess if e.message == 'TIMED_OUT'
    raise e
  end
  @job.body
end

#return_to_queueObject



53
54
55
# File 'lib/rservicebus2/mq/beanstalk.rb', line 53

def return_to_queue
  @job.release
end

#send(queue_name, msg) ⇒ Object



62
63
64
65
66
67
68
69
70
71
# File 'lib/rservicebus2/mq/beanstalk.rb', line 62

def send(queue_name, msg)
  if msg.length > @max_job_size
    puts '***Attempting to send a msg which will not fit on queue.'
    puts "***Msg size, #{msg.length}, max msg size, #{@max_job_size}."
    fail JobTooBigError, "Msg size, #{msg.length}, max msg size,
          #{@max_job_size}"
  end
  @beanstalk.use(queue_name)
  @beanstalk.put(msg)
end

#subscribe(queuename) ⇒ Object



38
39
40
# File 'lib/rservicebus2/mq/beanstalk.rb', line 38

def subscribe(queuename)
  @beanstalk.watch(queuename)
end