Class: Oni::Daemons::SQS

Inherits:
Oni::Daemon show all
Defined in:
lib/oni/daemons/sqs.rb

Overview

The SQS daemon is a basic daemon skeleton that can be used to process jobs from an Amazon SQS queue.

Basic usage:

class MyDaemon < Oni::Daemons::SQS
  set :queue_name, 'my_queue'
end

The following options can be set:

  • queue_name (required): the name of the queue to poll as a String.
  • poll_options: a Hash of options to pass to the poll method of the AWS SQS queue. See the documentation of AWS::SQS::Queue#poll for more information on the available options.

Constant Summary

Constants inherited from Oni::Daemon

Oni::Daemon::DEFAULT_THREAD_AMOUNT, Oni::Daemon::DEFAULT_WORKER_AMOUNT, Oni::Daemon::DEFAULT_WORKER_TIMEOUT

Instance Attribute Summary

Attributes inherited from Oni::Daemon

#daemon_workers, #workers

Instance Method Summary collapse

Methods inherited from Oni::Daemon

#complete, #create_mapper, #error, #initialize, #process, #run_thread, #run_worker, #spawn_thread, #spawn_worker, #standard_worker, #start, #stop, #threads, #worker_timeout

Methods included from Configurable

included, #option, #require_option!

Constructor Details

This class inherits a constructor from Oni::Daemon

Instance Method Details

#after_initializeObject

Checks if the queue_name option is set.



27
28
29
# File 'lib/oni/daemons/sqs.rb', line 27

def after_initialize
  require_option!(:queue_name)
end

#poll_optionsHash

Returns a Hash containing the options to use for the poll method of the SQS queue.

Returns:

  • (Hash)


52
53
54
# File 'lib/oni/daemons/sqs.rb', line 52

def poll_options
  return option(:poll_options, {})
end

#queueAws::SQS::QueuePoller

Returns the queue to use for the current thread.

Returns:

  • (Aws::SQS::QueuePoller)


61
62
63
# File 'lib/oni/daemons/sqs.rb', line 61

def queue
  return Aws::SQS::QueuePoller.new(queue_url)
end

#queue_urlString

Returns:

  • (String)


68
69
70
71
72
73
# File 'lib/oni/daemons/sqs.rb', line 68

def queue_url
  sqs      = Aws::SQS::Client.new
  response = sqs.get_queue_url(:queue_name => option(:queue_name))

  return response.queue_url
end

#receiveObject

Polls an SQS queue for a message and processes it.



34
35
36
37
38
39
40
41
42
43
44
# File 'lib/oni/daemons/sqs.rb', line 34

def receive
  poll_options.merge! max_number_of_messages: 10

  queue.poll poll_options do |messages|
    next yield messages unless messages.is_a? Array

    messages.each do |message|
      yield message
    end
  end
end