Class: FakeSQS::Queue

Inherits:
Object
  • Object
show all
Defined in:
lib/fake_sqs/queue.rb

Constant Summary collapse

VISIBILITY_TIMEOUT =
30

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(options = {}) ⇒ Queue

Returns a new instance of Queue.



16
17
18
19
20
21
22
23
24
# File 'lib/fake_sqs/queue.rb', line 16

def initialize(options = {})
  @message_factory = options.fetch(:message_factory)

  @name = options.fetch("QueueName")
  @arn = options.fetch("Arn") { "arn:aws:sqs:us-east-1:#{SecureRandom.hex}:#{@name}" }
  @queue_attributes = options.fetch("Attributes") { {} }
  @lock = Monitor.new
  reset
end

Instance Attribute Details

#arnObject (readonly)

Returns the value of attribute arn.



14
15
16
# File 'lib/fake_sqs/queue.rb', line 14

def arn
  @arn
end

#message_factoryObject (readonly)

Returns the value of attribute message_factory.



14
15
16
# File 'lib/fake_sqs/queue.rb', line 14

def message_factory
  @message_factory
end

#nameObject (readonly)

Returns the value of attribute name.



14
15
16
# File 'lib/fake_sqs/queue.rb', line 14

def name
  @name
end

#queue_attributesObject (readonly)

Returns the value of attribute queue_attributes.



14
15
16
# File 'lib/fake_sqs/queue.rb', line 14

def queue_attributes
  @queue_attributes
end

Instance Method Details

#add_queue_attributes(attrs) ⇒ Object



34
35
36
# File 'lib/fake_sqs/queue.rb', line 34

def add_queue_attributes(attrs)
  queue_attributes.merge!(attrs)
end

#attributesObject



38
39
40
41
42
43
44
# File 'lib/fake_sqs/queue.rb', line 38

def attributes
  queue_attributes.merge(
    "QueueArn" => arn,
    "ApproximateNumberOfMessages" => @messages.size,
    "ApproximateNumberOfMessagesNotVisible" => @messages_in_flight.size,
  )
end

#change_message_visibility(receipt, visibility) ⇒ Object



102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
# File 'lib/fake_sqs/queue.rb', line 102

def change_message_visibility(receipt, visibility)
  with_lock do
    message = @messages_in_flight[receipt]
    raise MessageNotInflight unless message

    if visibility == 0
      message.expire!
      @messages << message
      delete_message(receipt)
    else
      message.expire_at(visibility)
    end

  end
end

#default_visibility_timeoutObject



78
79
80
81
82
83
84
# File 'lib/fake_sqs/queue.rb', line 78

def default_visibility_timeout
  if value = attributes['VisibilityTimeout']
    value.to_i
  else
    VISIBILITY_TIMEOUT
  end
end

#delete_message(receipt) ⇒ Object



118
119
120
121
122
# File 'lib/fake_sqs/queue.rb', line 118

def delete_message(receipt)
  with_lock do
    @messages_in_flight.delete(receipt)
  end
end

#expireObject



132
133
134
135
136
137
# File 'lib/fake_sqs/queue.rb', line 132

def expire
  with_lock do
    @messages += @messages_in_flight.values
    reset_messages_in_flight
  end
end

#generate_receiptObject



158
159
160
# File 'lib/fake_sqs/queue.rb', line 158

def generate_receipt
  SecureRandom.hex
end

#messagesObject



146
147
148
# File 'lib/fake_sqs/queue.rb', line 146

def messages
  @messages_view
end

#messages_in_flightObject



150
151
152
# File 'lib/fake_sqs/queue.rb', line 150

def messages_in_flight
  @messages_in_flight_view
end

#receive_message(options = {}) ⇒ Object



54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
# File 'lib/fake_sqs/queue.rb', line 54

def receive_message(options = {})
  amount = Integer options.fetch("MaxNumberOfMessages") { "1" }

  fail ReadCountOutOfRange, amount if amount > 10

  return {} if @messages.empty?

  result = {}

  with_lock do
    actual_amount = amount > size ? size : amount

    actual_amount.times do
      message = @messages.delete_at(rand(size))
      message.expire_at(default_visibility_timeout)
      receipt = generate_receipt
      @messages_in_flight[receipt] = message
      result[receipt] = message
    end
  end

  result
end

#resetObject



124
125
126
127
128
129
130
# File 'lib/fake_sqs/queue.rb', line 124

def reset
  with_lock do
    @messages = []
    @messages_view = FakeSQS::CollectionView.new(@messages)
    reset_messages_in_flight
  end
end

#reset_messages_in_flightObject



139
140
141
142
143
144
# File 'lib/fake_sqs/queue.rb', line 139

def reset_messages_in_flight
  with_lock do
    @messages_in_flight = {}
    @messages_in_flight_view = FakeSQS::CollectionView.new(@messages_in_flight)
  end
end

#send_message(options = {}) ⇒ Object



46
47
48
49
50
51
52
# File 'lib/fake_sqs/queue.rb', line 46

def send_message(options = {})
  with_lock do
    message = message_factory.new(options)
    @messages << message
    message
  end
end

#sizeObject



154
155
156
# File 'lib/fake_sqs/queue.rb', line 154

def size
  messages.size
end

#timeout_messages!Object



86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
# File 'lib/fake_sqs/queue.rb', line 86

def timeout_messages!
  with_lock do
    expired = @messages_in_flight.inject({}) do |memo,(receipt,message)|
      if message.expired?
        memo[receipt] = message
      end
      memo
    end
    expired.each do |receipt,message|
      message.expire!
      @messages << message
      delete_message(receipt)
    end
  end
end

#to_yamlObject



26
27
28
29
30
31
32
# File 'lib/fake_sqs/queue.rb', line 26

def to_yaml
  {
    "QueueName" => name,
    "Arn" => arn,
    "Attributes" => queue_attributes,
  }
end

#with_lockObject



162
163
164
165
166
# File 'lib/fake_sqs/queue.rb', line 162

def with_lock
  @lock.synchronize do
    yield
  end
end