Class: RServiceBus2::MQAWS

Inherits:
MQ
  • Object
show all
Defined in:
lib/rservicebus2/mq/aws.rb

Overview

Beanstalk client implementation.

Instance Attribute Summary

Attributes inherited from MQ

#local_queue_name

Instance Method Summary collapse

Methods inherited from MQ

get, #initialize

Constructor Details

This class inherits a constructor from RServiceBus2::MQ

Instance Method Details

#ackObject



51
52
53
54
# File 'lib/rservicebus2/mq/aws.rb', line 51

def ack
  @sqs_client.delete_message({ queue_url: @queue_url, receipt_handle: @job.receipt_handle })
  @job = nil
end

#connect(region, _port) ⇒ Object

Connect to the broker



11
12
13
14
15
16
17
18
19
20
21
22
23
# File 'lib/rservicebus2/mq/aws.rb', line 11

def connect(region, _port)
  @max_job_size = 4_194_304
  @region = region

  sts_client = Aws::STS::Client.new(region: region)
  @caller_identity_account = sts_client.get_caller_identity.
rescue StandardError => e
  puts 'Error connecting to AWS'
  puts "Host string, #{region}"
  puts e.message
  puts e.backtrace
  abort
end

#popObject

Get next msg from queue



33
34
35
36
37
38
39
40
41
42
43
44
45
# File 'lib/rservicebus2/mq/aws.rb', line 33

def pop
  response = @sqs_client.receive_message(queue_url: @queue_url, max_number_of_messages: 1)

  raise NoMsgToProcess if response.messages.count.zero?

  response.messages.each do |message|
    @job = message
  end
rescue StandardError => e
  raise e
ensure
  @job.body
end

#return_to_queueObject



47
48
49
# File 'lib/rservicebus2/mq/aws.rb', line 47

def return_to_queue
  @job = nil
end

#send(queue_name, msg) ⇒ Object



56
57
58
59
60
61
62
63
64
65
66
67
68
69
# File 'lib/rservicebus2/mq/aws.rb', line 56

def send(queue_name, msg)
  if msg.length > @max_job_size
    puts '***Attempting to send a msg which will not fit on queue.' \
         "***Msg size, #{msg.length}, max msg size, #{@max_job_size}."
    raise JobTooBigError, "Msg size, #{msg.length}, max msg size, #{@max_job_size}"
  end

  queue_url = "https://sqs.#{@region}.amazonaws.com/#{@caller_identity_account}/#{queue_name}"

  @sqs_client.send_message(
    queue_url: queue_url,
    message_body: msg
  )
end

#subscribe(queuename) ⇒ Object



25
26
27
28
29
30
# File 'lib/rservicebus2/mq/aws.rb', line 25

def subscribe(queuename)
  # For example:
  # 'https://sqs.us-east-1.amazonaws.com/111111111111/my-queue'
  @queue_url = "https://sqs.#{@region}.amazonaws.com/#{@caller_identity_account}/#{queuename}"
  @sqs_client = Aws::SQS::Client.new(region: @region)
end