Class: LSQS::Queue

Inherits:
Object
  • Object
show all
Defined in:
lib/lsqs/queue.rb

Constant Summary collapse

DEFAULT_TIMEOUT =
30

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(name, params = {}) ⇒ Queue

Returns a new instance of Queue.



7
8
9
10
11
12
13
14
15
16
# File 'lib/lsqs/queue.rb', line 7

def initialize(name, params = {})
  @name       = name
  @attributes = params.fetch('Attributes'){Hash.new}
  @monitor    = Monitor.new
  @messages   = []
  @in_flight  = {}
  @timeout    = true
  
  check_timeout
end

Instance Attribute Details

#attributesObject

Returns the value of attribute attributes.



3
4
5
# File 'lib/lsqs/queue.rb', line 3

def attributes
  @attributes
end

#in_flightObject

Returns the value of attribute in_flight.



3
4
5
# File 'lib/lsqs/queue.rb', line 3

def in_flight
  @in_flight
end

#messagesObject

Returns the value of attribute messages.



3
4
5
# File 'lib/lsqs/queue.rb', line 3

def messages
  @messages
end

#monitorObject

Returns the value of attribute monitor.



3
4
5
# File 'lib/lsqs/queue.rb', line 3

def monitor
  @monitor
end

#nameObject

Returns the value of attribute name.



3
4
5
# File 'lib/lsqs/queue.rb', line 3

def name
  @name
end

Instance Method Details

#change_message_visibility(receipt, seconds) ⇒ Object

Change the visibility of a message that is in flight. If visibility is set to 0, put back in the queue.

Parameters:

  • receipt (String)
  • seconds (Fixnum)


119
120
121
122
123
124
125
126
127
128
129
130
131
132
# File 'lib/lsqs/queue.rb', line 119

def change_message_visibility(receipt, seconds)
  lock do
    message = @in_flight[receipt]
    raise 'MessageNotInflight' unless message

    if seconds == 0
      message.expire
      @messages << message
      delete_message(receipt)
    else
      message.expire_in(seconds)
    end
  end
end

#create_message(options = {}) ⇒ Message

Creates a new message in the queue.

Parameters:

  • options (Hash) (defaults to: {})

Returns:



36
37
38
39
40
41
42
# File 'lib/lsqs/queue.rb', line 36

def create_message(options = {})
  lock do
    message = Message.new(options)
    @messages << message
    return message
  end
end

#delete_message(receipt) ⇒ Object

Deletes a message from the messages that are in-flight.

Parameters:

  • receipt (String)


78
79
80
81
82
# File 'lib/lsqs/queue.rb', line 78

def delete_message(receipt)
  lock do
    in_flight.delete(receipt)
  end
end

#generate_receiptString

Generates a hex receipt for the message

Returns:

  • (String)


108
109
110
# File 'lib/lsqs/queue.rb', line 108

def generate_receipt
  SecureRandom.hex(16)
end

#get_messages(options = {}) ⇒ Hash

Gets a number of messages based on the MaxNumberOfMessages field.

Parameters:

  • options (Hash) (defaults to: {})

Returns:

  • (Hash)


51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
# File 'lib/lsqs/queue.rb', line 51

def get_messages(options = {})
  number_of_messages = options.fetch('MaxNumberOfMessages'){1}.to_i

  raise 'ReadCountOutOfRange' if number_of_messages > 10

  result = {}

  lock do
    amount = number_of_messages > size ? size : number_of_messages

    amount.times do
      message             = messages.delete_at(rand(size))
      message.expire_in(visibility_timeout)
      receipt             = generate_receipt
      @in_flight[receipt] = message
      result[receipt]     = message
    end
  end

  return result
end

#purgeObject

Deletes all messages in queue and in flight.



87
88
89
90
91
92
# File 'lib/lsqs/queue.rb', line 87

def purge
  lock do
    @messages  = []
    @in_flight = {}
  end
end

#sizeFixnum

Returns the amount of messages in the queue.

Returns:

  • (Fixnum)


99
100
101
# File 'lib/lsqs/queue.rb', line 99

def size
  messages.size
end

#timeout_messagesObject

Checks if in-fligh messages need to be put back in the queue.



137
138
139
140
141
142
143
144
145
146
147
# File 'lib/lsqs/queue.rb', line 137

def timeout_messages
  lock do
    in_flight.each do |key, message|
      if message.expired?
        message.expire
        @messages << message
        delete_message(key)
      end
    end
  end
end

#visibility_timeoutFixnum

Sets the default timeout of a queue. It takes the value from the attributes, if it is set, otherwise it uses the ‘DEFAULT_TIMEOUT` constant.

Returns:

  • (Fixnum)


25
26
27
# File 'lib/lsqs/queue.rb', line 25

def visibility_timeout
  attributes['VisibilityTimeout'] || DEFAULT_TIMEOUT
end