Class: FakeSQS::Queue
- Inherits:
-
Object
- Object
- FakeSQS::Queue
- Defined in:
- lib/fake_sqs/queue.rb
Constant Summary collapse
- VISIBILITY_TIMEOUT =
30
Instance Attribute Summary collapse
-
#arn ⇒ Object
readonly
Returns the value of attribute arn.
-
#message_factory ⇒ Object
readonly
Returns the value of attribute message_factory.
-
#name ⇒ Object
readonly
Returns the value of attribute name.
-
#queue_attributes ⇒ Object
readonly
Returns the value of attribute queue_attributes.
Instance Method Summary collapse
- #add_queue_attributes(attrs) ⇒ Object
- #attributes ⇒ Object
- #change_message_visibility(receipt, visibility) ⇒ Object
- #default_visibility_timeout ⇒ Object
- #delete_message(receipt) ⇒ Object
- #expire ⇒ Object
- #generate_receipt ⇒ Object
-
#initialize(options = {}) ⇒ Queue
constructor
A new instance of Queue.
- #messages ⇒ Object
- #messages_in_flight ⇒ Object
- #receive_message(options = {}) ⇒ Object
- #reset ⇒ Object
- #reset_messages_in_flight ⇒ Object
- #send_message(options = {}) ⇒ Object
- #size ⇒ Object
- #timeout_messages! ⇒ Object
- #to_yaml ⇒ Object
- #with_lock ⇒ Object
Constructor Details
#initialize(options = {}) ⇒ Queue
Returns a new instance of Queue.
16 17 18 19 20 21 22 23 24 |
# File 'lib/fake_sqs/queue.rb', line 16 def initialize( = {}) @message_factory = .fetch(:message_factory) @name = .fetch("QueueName") @arn = .fetch("Arn") { "arn:aws:sqs:us-east-1:#{SecureRandom.hex}:#{@name}" } @queue_attributes = .fetch("Attributes") { {} } @lock = Monitor.new reset end |
Instance Attribute Details
#arn ⇒ Object (readonly)
Returns the value of attribute arn.
14 15 16 |
# File 'lib/fake_sqs/queue.rb', line 14 def arn @arn end |
#message_factory ⇒ Object (readonly)
Returns the value of attribute message_factory.
14 15 16 |
# File 'lib/fake_sqs/queue.rb', line 14 def @message_factory end |
#name ⇒ Object (readonly)
Returns the value of attribute name.
14 15 16 |
# File 'lib/fake_sqs/queue.rb', line 14 def name @name end |
#queue_attributes ⇒ Object (readonly)
Returns the value of attribute queue_attributes.
14 15 16 |
# File 'lib/fake_sqs/queue.rb', line 14 def queue_attributes @queue_attributes end |
Instance Method Details
#add_queue_attributes(attrs) ⇒ Object
34 35 36 |
# File 'lib/fake_sqs/queue.rb', line 34 def add_queue_attributes(attrs) queue_attributes.merge!(attrs) end |
#attributes ⇒ Object
38 39 40 41 42 43 44 |
# File 'lib/fake_sqs/queue.rb', line 38 def attributes queue_attributes.merge( "QueueArn" => arn, "ApproximateNumberOfMessages" => @messages.size, "ApproximateNumberOfMessagesNotVisible" => @messages_in_flight.size, ) end |
#change_message_visibility(receipt, visibility) ⇒ Object
102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 |
# File 'lib/fake_sqs/queue.rb', line 102 def (receipt, visibility) with_lock do = @messages_in_flight[receipt] raise MessageNotInflight unless if visibility == 0 .expire! @messages << (receipt) else .expire_at(visibility) end end end |
#default_visibility_timeout ⇒ Object
78 79 80 81 82 83 84 |
# File 'lib/fake_sqs/queue.rb', line 78 def default_visibility_timeout if value = attributes['VisibilityTimeout'] value.to_i else VISIBILITY_TIMEOUT end end |
#delete_message(receipt) ⇒ Object
118 119 120 121 122 |
# File 'lib/fake_sqs/queue.rb', line 118 def (receipt) with_lock do @messages_in_flight.delete(receipt) end end |
#expire ⇒ Object
132 133 134 135 136 137 |
# File 'lib/fake_sqs/queue.rb', line 132 def expire with_lock do @messages += @messages_in_flight.values end end |
#generate_receipt ⇒ Object
158 159 160 |
# File 'lib/fake_sqs/queue.rb', line 158 def generate_receipt SecureRandom.hex end |
#messages ⇒ Object
146 147 148 |
# File 'lib/fake_sqs/queue.rb', line 146 def @messages_view end |
#messages_in_flight ⇒ Object
150 151 152 |
# File 'lib/fake_sqs/queue.rb', line 150 def @messages_in_flight_view end |
#receive_message(options = {}) ⇒ Object
54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 |
# File 'lib/fake_sqs/queue.rb', line 54 def ( = {}) amount = Integer .fetch("MaxNumberOfMessages") { "1" } fail ReadCountOutOfRange, amount if amount > 10 return {} if @messages.empty? result = {} with_lock do actual_amount = amount > size ? size : amount actual_amount.times do = @messages.delete_at(rand(size)) .expire_at(default_visibility_timeout) receipt = generate_receipt @messages_in_flight[receipt] = result[receipt] = end end result end |
#reset ⇒ Object
124 125 126 127 128 129 130 |
# File 'lib/fake_sqs/queue.rb', line 124 def reset with_lock do @messages = [] @messages_view = FakeSQS::CollectionView.new(@messages) end end |
#reset_messages_in_flight ⇒ Object
139 140 141 142 143 144 |
# File 'lib/fake_sqs/queue.rb', line 139 def with_lock do @messages_in_flight = {} @messages_in_flight_view = FakeSQS::CollectionView.new(@messages_in_flight) end end |
#send_message(options = {}) ⇒ Object
46 47 48 49 50 51 52 |
# File 'lib/fake_sqs/queue.rb', line 46 def ( = {}) with_lock do = .new() @messages << end end |
#size ⇒ Object
154 155 156 |
# File 'lib/fake_sqs/queue.rb', line 154 def size .size end |
#timeout_messages! ⇒ Object
86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 |
# File 'lib/fake_sqs/queue.rb', line 86 def with_lock do expired = @messages_in_flight.inject({}) do |memo,(receipt,)| if .expired? memo[receipt] = end memo end expired.each do |receipt,| .expire! @messages << (receipt) end end end |
#to_yaml ⇒ Object
26 27 28 29 30 31 32 |
# File 'lib/fake_sqs/queue.rb', line 26 def to_yaml { "QueueName" => name, "Arn" => arn, "Attributes" => queue_attributes, } end |
#with_lock ⇒ Object
162 163 164 165 166 |
# File 'lib/fake_sqs/queue.rb', line 162 def with_lock @lock.synchronize do yield end end |