Class: Workling::Clients::SqsClient

Inherits:
BrokerBase show all
Defined in:
lib/workling/clients/sqs_client.rb

Constant Summary collapse

AWS_MAX_QUEUE_NAME =
80
DEFAULT_MESSAGES_PER_REQ =

Note that 10 is the maximum number of messages that can be retrieved in a single request.

10
DEFAULT_VISIBILITY_TIMEOUT =
30
DEFAULT_VISIBILITY_RESERVE =
10

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods inherited from BrokerBase

#dispatch, #subscribe

Methods inherited from Base

#dispatch, installed?, load, #subscribe

Instance Attribute Details

#messages_per_reqObject (readonly)

Returns the value of attribute messages_per_req.



43
44
45
# File 'lib/workling/clients/sqs_client.rb', line 43

def messages_per_req
  @messages_per_req
end

#sqs_optionsObject (readonly)

Mainly exposed for testing purposes



42
43
44
# File 'lib/workling/clients/sqs_client.rb', line 42

def sqs_options
  @sqs_options
end

#visibility_timeoutObject (readonly)

Returns the value of attribute visibility_timeout.



44
45
46
# File 'lib/workling/clients/sqs_client.rb', line 44

def visibility_timeout
  @visibility_timeout
end

Instance Method Details

#closeObject

No need for explicit closing, since there is no persistent connection to SQS.



73
74
75
# File 'lib/workling/clients/sqs_client.rb', line 73

def close
  true
end

#connectObject

Starts the client.



47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
# File 'lib/workling/clients/sqs_client.rb', line 47

def connect
  @sqs_options = Workling.config[:sqs_options]

  # Make sure that required options were specified
  unless (@sqs_options.include?('aws_access_key_id') &&
         @sqs_options.include?('aws_secret_access_key'))
    raise WorklingError, 'Unable to start SqsClient due to missing SQS options'
  end
  
  # Optional settings
  @messages_per_req = @sqs_options['messages_per_req'] || DEFAULT_MESSAGES_PER_REQ
  @visibility_timeout = @sqs_options['visibility_timeout'] || DEFAULT_VISIBILITY_TIMEOUT
  @visibility_reserve = @sqs_options['visibility_reserve'] || DEFAULT_VISIBILITY_RESERVE
  
  begin
    @sqs = RightAws::SqsGen2.new(
      @sqs_options['aws_access_key_id'],
      @sqs_options['aws_secret_access_key'],
      :multi_thread => true)
  rescue => e
    raise WorklingError, "Unable to connect to SQS. Error: #{e}"
  end
end

#queue_for_key(key) ⇒ Object

Returns the queue that corresponds to the specified key. Creates the queue if it doesn’t exist yet.



137
138
139
140
# File 'lib/workling/clients/sqs_client.rb', line 137

def queue_for_key(key)
  # Use thread local for storing queues, for the same reason as for buffers
  Thread.current["queue_#{key}"] ||= @sqs.queue(queue_name(key), true, @visibility_timeout)
end

#queue_name(key) ⇒ Object

Returns the queue name for the specified key. The name consists of an optional prefix, followed by the environment and the key itself. Note that with a long worker class / method name, the name could exceed the 80 character maximum for SQS queue names. We truncate the name until it fits, but there’s still the danger of this not being unique any more. Might need to implement a more robust naming scheme…



148
149
150
# File 'lib/workling/clients/sqs_client.rb', line 148

def queue_name(key)
  "#{@sqs_options['prefix'] || ''}#{env}_#{key}"[0, AWS_MAX_QUEUE_NAME]
end

#request(key, value) ⇒ Object

Request work.



126
127
128
129
130
131
132
133
# File 'lib/workling/clients/sqs_client.rb', line 126

def request(key, value)
  begin
    queue_for_key(key).send_message(value.to_json)
  rescue => e
    logger.error "SQS Client: Error sending msg for key: #{key}, value: #{value.inspect}; Error: #{e}"
    raise WorklingError, "Error sending msg for key: #{key}, value: #{value.inspect}; Error: #{e}"
  end
end

#retrieve(key) ⇒ Object

Retrieve work.



78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
# File 'lib/workling/clients/sqs_client.rb', line 78

def retrieve(key)
  begin
    # We're using a buffer per key to retrieve several messages at once,
    # then return them one at a time until the buffer is empty.
    # Workling seems to create one thread per worker class, each with its own
    # client. But to be sure (and to be less dependent on workling internals),
    # we store each buffer in a thread local variable.
    buffer = Thread.current["buffer_#{key}"]
    if buffer.nil? || buffer.empty?
      Thread.current["buffer_#{key}"] = buffer = queue_for_key(key).receive_messages(
        @messages_per_req, @visibility_timeout)
    end

    if buffer.empty?
      nil
    else
      msg = buffer.shift

      # We need to protect against the case that processing one of the
      # messages in the buffer took so much time that the visibility
      # timeout for the remaining messages has expired. To be on the
      # safe side (since we need to leave enough time to delete the
      # message), we drop it if more than half of the visibility timeout
      # has elapsed.
      if msg.received_at < (Time.now - (@visibility_timeout - @visibility_reserve))
        nil
      else
        # Need to wrap in HashWithIndifferentAccess, as JSON serialization
        # loses symbol keys.
        parsed_msg = HashWithIndifferentAccess.new(JSON.parse(msg.body))
      
        # Delete the msg from SQS, so we don't re-retrieve it after the
        # visibility timeout. Ideally we would defer deleting a msg until
        # after Workling has successfully processed it, but it currently
        # doesn't provide the necessary hooks for this.
        msg.delete
      
        parsed_msg
      end
    end

  rescue => e
    logger.error "Error retrieving msg for key: #{key}; Error: #{e}\n#{e.backtrace.join("\n")}"
  end
  
end