Class: AzureClient::BufferedQueue

Inherits:
Object
  • Object
show all
Defined in:
lib/azure_client/buffered_queue.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(queues, container, retry_policy = ExponentialRetryPolicy.new(5,1,2)) ⇒ BufferedQueue

Returns a new instance of BufferedQueue.



5
6
7
8
9
10
# File 'lib/azure_client/buffered_queue.rb', line 5

def initialize(queues, container, retry_policy = ExponentialRetryPolicy.new(5,1,2))
  @queues = queues.kind_of?(Array) ? queues : [queues]
  @name = @queues[0].name
  @container = container
  @retry_policy = retry_policy
end

Instance Attribute Details

#nameObject

Returns the value of attribute name.



3
4
5
# File 'lib/azure_client/buffered_queue.rb', line 3

def name
  @name
end

Instance Method Details

#add_message(content, metadata = "", options = {}, retry_policy = @retry_policy) ⇒ Object



12
13
14
15
16
17
18
19
20
21
22
23
24
# File 'lib/azure_client/buffered_queue.rb', line 12

def add_message(content,  = "", options = {}, retry_policy = @retry_policy)
  payload = {"content" => content, "metadata" => }.to_json
  queue = select_queue
  if is_allowed_payload_size(payload)
    queue.add_message(payload, retry_policy)
  else
    #pick random blob name
    blob_name = name + (0...9).map{ ('a'..'z').to_a[rand(26)] }.join
    reference = {"type" => "azure_blob_reference", "name" => blob_name, "metadata" => }
    @container.store_blob(blob_name, content, options, retry_policy)
    queue.add_message(reference.to_json, retry_policy)
  end
end

#delete(retry_policy = @retry_policy) ⇒ Object



33
34
35
36
37
38
# File 'lib/azure_client/buffered_queue.rb', line 33

def delete(retry_policy = @retry_policy)
  @queues.each do |queue|
    queue.delete(retry_policy)
  end
  @container.delete(retry_policy)
end

#get_message(retry_policy = LinearRetryPolicy.new(1,1)) ⇒ Object

default to one try



27
28
29
30
31
# File 'lib/azure_client/buffered_queue.rb', line 27

def get_message(retry_policy = LinearRetryPolicy.new(1,1))
  queue = select_queue 
  message = queue.get_message(retry_policy)
  message ? BufferedQueueMessage.new(message, @container, @retry_policy) : nil
end