Class: ActiveMessaging::Adapters::AmazonSqs::Connection

Inherits:
BaseConnection
  • Object
show all
Defined in:
lib/activemessaging/adapters/asqs.rb

Constant Summary collapse

QUEUE_NAME_LENGTH =
1..80
MESSAGE_SIZE =
1..(8 * 1024)
VISIBILITY_TIMEOUT =
0..(24 * 60 * 60)
NUMBER_OF_MESSAGES =
1..255
GET_QUEUE_ATTRIBUTES =
['All', 'ApproximateNumberOfMessages', 'VisibilityTimeout']
SET_QUEUE_ATTRIBUTES =
['VisibilityTimeout']

Instance Attribute Summary collapse

Attributes inherited from BaseConnection

#reliable

Instance Method Summary collapse

Methods included from ActiveMessaging::Adapter

included, #logger

Constructor Details

#initialize(cfg) ⇒ Connection

generic init method needed by a13g



31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
# File 'lib/activemessaging/adapters/asqs.rb', line 31

def initialize cfg
  raise "Must specify a access_key_id" if (cfg[:access_key_id].nil? || cfg[:access_key_id].empty?)
  raise "Must specify a secret_access_key" if (cfg[:secret_access_key].nil? || cfg[:secret_access_key].empty?)

  @access_key_id=cfg[:access_key_id]
  @secret_access_key=cfg[:secret_access_key]
  @request_expires = cfg[:requestExpires]         || 10
  @request_retry_count = cfg[:requestRetryCount]  || 5
  @aws_version = cfg[:aws_version]                || '2008-01-01'
  @content_type = cfg[:content_type]              || 'text/plain'
  @host = cfg[:host]                              || 'queue.amazonaws.com'
  @port = cfg[:port]                              || 80
  @protocol = cfg[:protocol]                      || 'http'
  @poll_interval = cfg[:poll_interval]            || 1
  @reconnect_delay = cfg[:reconnectDelay]         || 5
  @aws_url="#{@protocol}://#{@host}"

  @cache_queue_list = cfg[:cache_queue_list].nil? ? true : cfg[:cache_queue_list]
  @reliable =         cfg[:reliable].nil?         ? true : cfg[:reliable]

  #initialize the subscriptions and queues
  @subscriptions = {}
  @current_subscription = 0
  queues
end

Instance Attribute Details

#access_key_idObject

configurable params



28
29
30
# File 'lib/activemessaging/adapters/asqs.rb', line 28

def access_key_id
  @access_key_id
end

#aws_versionObject

configurable params



28
29
30
# File 'lib/activemessaging/adapters/asqs.rb', line 28

def aws_version
  @aws_version
end

#cache_queue_listObject

configurable params



28
29
30
# File 'lib/activemessaging/adapters/asqs.rb', line 28

def cache_queue_list
  @cache_queue_list
end

#content_typeObject

configurable params



28
29
30
# File 'lib/activemessaging/adapters/asqs.rb', line 28

def content_type
  @content_type
end

#hostObject

configurable params



28
29
30
# File 'lib/activemessaging/adapters/asqs.rb', line 28

def host
  @host
end

#poll_intervalObject

configurable params



28
29
30
# File 'lib/activemessaging/adapters/asqs.rb', line 28

def poll_interval
  @poll_interval
end

#portObject

configurable params



28
29
30
# File 'lib/activemessaging/adapters/asqs.rb', line 28

def port
  @port
end

#reconnectDelayObject

configurable params



28
29
30
# File 'lib/activemessaging/adapters/asqs.rb', line 28

def reconnectDelay
  @reconnectDelay
end

#secret_access_keyObject

configurable params



28
29
30
# File 'lib/activemessaging/adapters/asqs.rb', line 28

def secret_access_key
  @secret_access_key
end

Instance Method Details

#disconnectObject



57
58
59
60
# File 'lib/activemessaging/adapters/asqs.rb', line 57

def disconnect
  #it's an http request - there is no disconnect - ha!
  return true
end

#receiveObject

receive a single message from any of the subscribed queues check each queue once, then sleep for poll_interval



92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
# File 'lib/activemessaging/adapters/asqs.rb', line 92

def receive
  raise "No subscriptions to receive messages from." if (@subscriptions.nil? || @subscriptions.empty?)
  start = @current_subscription
  while true
    # puts "calling receive..."
    @current_subscription = ((@current_subscription < @subscriptions.length-1) ? @current_subscription + 1 : 0)
    sleep poll_interval if (@current_subscription == start)
    queue_name = @subscriptions.keys.sort[@current_subscription]
    queue = queues[queue_name]
    subscription = @subscriptions[queue_name]
    unless queue.nil?
      messages = retrieve_messsages queue, 1, subscription.headers[:visibility_timeout]
      return messages[0] unless (messages.nil? or messages.empty? or messages[0].nil?)
    end
  end
end

#received(message, headers = {}) ⇒ Object



109
110
111
112
113
114
115
116
# File 'lib/activemessaging/adapters/asqs.rb', line 109

def received message, headers={}
  begin
    delete_message message
  rescue Object=>exception
    logger.error "Exception in ActiveMessaging::Adapters::AmazonSWS::Connection.received() logged and ignored: "
    logger.error exception
  end
end

#send(queue_name, message_body, message_headers = {}) ⇒ Object

queue_name string, body string, headers hash send a single message to a queue



85
86
87
88
# File 'lib/activemessaging/adapters/asqs.rb', line 85

def send queue_name, message_body, message_headers={}
  queue = get_or_create_queue queue_name
  send_messsage queue, message_body
end

#subscribe(queue_name, message_headers = {}) ⇒ Object

queue_name string, headers hash for sqs, make sure queue exists, if not create, then add to list of polled queues



64
65
66
67
68
69
70
71
72
# File 'lib/activemessaging/adapters/asqs.rb', line 64

def subscribe queue_name, message_headers={}
  # look at the existing queues, create any that are missing
  queue = get_or_create_queue queue_name
  if @subscriptions.has_key? queue.name
    @subscriptions[queue.name].add
  else
    @subscriptions[queue.name] = Subscription.new(queue.name, message_headers)
  end
end

#unreceive(message, headers = {}) ⇒ Object

do nothing; by not deleting the message will eventually become visible again



119
120
121
# File 'lib/activemessaging/adapters/asqs.rb', line 119

def unreceive message, headers={}
  return true
end

#unsubscribe(queue_name, message_headers = {}) ⇒ Object

queue_name string, headers hash for sqs, attempt delete the queues, won’t work if not empty, that’s ok



76
77
78
79
80
81
# File 'lib/activemessaging/adapters/asqs.rb', line 76

def unsubscribe queue_name, message_headers={}
  if @subscriptions[queue_name]
    @subscriptions[queue_name].remove
    @subscriptions.delete(queue_name) if @subscriptions[queue_name].count <= 0
  end
end