Class: Qurd::Listener

Inherits:
Object
  • Object
show all
Includes:
Mixins::AwsClients, Mixins::Configuration
Defined in:
lib/qurd/listener.rb

Overview

Provide an interface for interacting with configured queues and AWS.

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from Mixins::Configuration

#qurd_config, #qurd_configuration, #qurd_logger, #qurd_logger!

Methods included from Mixins::AwsClients

#aws_client, #aws_retryable

Constructor Details

#initialize(attrs = {}) ⇒ Listener

Returns a new instance of Listener.

Parameters:

  • attrs (Hash) (defaults to: {})

Options Hash (attrs):

  • :aws_credentials (Aws::Credentials)
  • :name (String)
  • :queues (Array<String|Regexp>)

    An array of SQS names and Regexps

  • :region (String)
  • :visibility_timeout (String)
  • :wait_time (String)


44
45
46
47
48
49
50
51
52
# File 'lib/qurd/listener.rb', line 44

def initialize(attrs = {})
  @aws_credentials = attrs[:aws_credentials]
  @name = attrs[:name]
  @region = attrs[:region]
  @visibility_timeout = attrs[:visibility_timeout]
  @wait_time = attrs[:wait_time]
  @queues = convert_queues attrs[:queues]
  configure_queues
end

Instance Attribute Details

#aws_credentialsAws::Credentials (readonly)

The AWS credentials for the account

Returns:

  • (Aws::Credentials)


28
29
30
# File 'lib/qurd/listener.rb', line 28

def aws_credentials
  @aws_credentials
end

#messageQurd::Message (readonly)

The message reveived

Returns:



28
29
30
31
32
33
34
# File 'lib/qurd/listener.rb', line 28

attr_reader :aws_credentials,
:message,
:name,
:queues,
:region,
:visibility_timeout,
:wait_time

#nameString (readonly)

The name of the executor

Returns:



28
29
30
31
32
33
34
# File 'lib/qurd/listener.rb', line 28

attr_reader :aws_credentials,
:message,
:name,
:queues,
:region,
:visibility_timeout,
:wait_time

#queuesArray<String> (readonly)

An array of AWS SQS URLs for the account

Returns:



28
29
30
31
32
33
34
# File 'lib/qurd/listener.rb', line 28

attr_reader :aws_credentials,
:message,
:name,
:queues,
:region,
:visibility_timeout,
:wait_time

#regionString (readonly)

The AWS region for the account

Returns:



28
29
30
31
32
33
34
# File 'lib/qurd/listener.rb', line 28

attr_reader :aws_credentials,
:message,
:name,
:queues,
:region,
:visibility_timeout,
:wait_time

#visibility_timeoutString (readonly)

Returns:



28
29
30
31
32
33
34
# File 'lib/qurd/listener.rb', line 28

attr_reader :aws_credentials,
:message,
:name,
:queues,
:region,
:visibility_timeout,
:wait_time

#wait_timeObject (readonly)

Returns the value of attribute wait_time.



28
29
30
31
32
33
34
# File 'lib/qurd/listener.rb', line 28

attr_reader :aws_credentials,
:message,
:name,
:queues,
:region,
:visibility_timeout,
:wait_time

Instance Method Details

#inspectObject



97
98
99
# File 'lib/qurd/listener.rb', line 97

def inspect
  format('<Qurd::Listener:%x name:%s>',  object_id, name)
end

#listenArray<Thread>

Create one thread per queue, receive messages from it and process each message received

Returns:

  • (Array<Thread>)


73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
# File 'lib/qurd/listener.rb', line 73

def listen
  queue_threads do |qurl, _context|
    loop do
      begin
        msgs = aws_client(:SQS).receive_message(
          queue_url: qurl,
          wait_time_seconds: wait_time,
          visibility_timeout: visibility_timeout
        )
        threads = process_messages(qurl, msgs)
        joins = threads.map do |thread|
          thread.join(qurd_configuration.listen_timeout)
        end
        if joins.compact.count != threads.count
          qurd_logger.warn('Some threads timed out')
        end
      rescue Aws::Errors::ServiceError => e
        qurd_logger.error("Aws raised #{e}")
      end
    end
  end
end

#queue_threads(&_block) {|url, ctx| ... } ⇒ Object

Create a thread for each queue URL, a context denoting the listener name and the queue URL.

Parameters:

  • _block (Proc)

    the proc each thread should run

Yield Parameters:

  • url (String)

    the url of the queue

  • ctx (Cabin::Context)

    the logging context



59
60
61
62
63
64
65
66
67
68
# File 'lib/qurd/listener.rb', line 59

def queue_threads(&_block)
  queues.map do |qurl|
    qurd_logger.debug("Creating thread for #{qurl}")
    Thread.new(qurl) do |url|
      ctx = qurd_config.get_context(name: name, queue_name: url[/([^\/]+)$/])
      qurd_logger.debug('Thread running')
      yield url, ctx
    end
  end
end