Class: Eventboss::LongPoller

Inherits:
Object
  • Object
show all
Includes:
Logging, SafeThread
Defined in:
lib/eventboss/long_poller.rb

Overview

LongPoller fetches messages from SQS using Long Polling docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-long-polling.html It starts one thread per queue (handled by Launcher)

Constant Summary collapse

TIME_WAIT =
10

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from SafeThread

#handle_exception, #safe_thread

Methods included from Logging

#logger

Constructor Details

#initialize(launcher, bus, client, queue, listener) ⇒ LongPoller

Returns a new instance of LongPoller.



13
14
15
16
17
18
19
20
21
22
# File 'lib/eventboss/long_poller.rb', line 13

def initialize(launcher, bus, client, queue, listener)
  @id = "poller-#{queue.name}"
  @launcher = launcher
  @bus = bus
  @client = client
  @queue = queue
  @listener = listener
  @thread = nil
  @stop = false
end

Instance Attribute Details

#idObject (readonly)

Returns the value of attribute id.



11
12
13
# File 'lib/eventboss/long_poller.rb', line 11

def id
  @id
end

#listenerObject (readonly)

Returns the value of attribute listener.



11
12
13
# File 'lib/eventboss/long_poller.rb', line 11

def listener
  @listener
end

#queueObject (readonly)

Returns the value of attribute queue.



11
12
13
# File 'lib/eventboss/long_poller.rb', line 11

def queue
  @queue
end

Instance Method Details

#fetch_and_dispatchObject



44
45
46
47
48
49
50
51
# File 'lib/eventboss/long_poller.rb', line 44

def fetch_and_dispatch
  fetch_messages.each do |message|
    logger.debug(id) { "enqueueing message #{message.message_id}" }
    @bus << UnitOfWork.new(@client, queue, listener, message)
  rescue ClosedQueueError
    logger.info(id) { "skip message #{message.message_id} enqueuing due to closed queue" }
  end
end

#kill(wait = false) ⇒ Object



34
35
36
37
38
39
40
41
42
# File 'lib/eventboss/long_poller.rb', line 34

def kill(wait = false)
  @stop = true
  return unless @thread
  @thread.value if wait

  # Force shutdown of poller, in case the loop is stuck
  @thread.raise Eventboss::Shutdown
  @thread.value if wait
end

#runObject



53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
# File 'lib/eventboss/long_poller.rb', line 53

def run
  fetch_and_dispatch until @stop
  @launcher.poller_stopped(self)
rescue Eventboss::Shutdown
  @launcher.poller_stopped(self)
rescue Aws::SQS::Errors::NonExistentQueue => exception
  handle_exception(exception, poller_id: id)
  @launcher.poller_stopped(self)
rescue StandardError => exception
  handle_exception(exception, poller_id: id)
  # Give a chance for temporary AWS errors to be resolved
  # Sleep guarantees against repeating fast failure errors
  sleep TIME_WAIT
  @launcher.poller_stopped(self, restart: @stop == false)
end

#startObject



24
25
26
# File 'lib/eventboss/long_poller.rb', line 24

def start
  @thread = safe_thread(id, &method(:run))
end

#terminate(wait = false) ⇒ Object



28
29
30
31
32
# File 'lib/eventboss/long_poller.rb', line 28

def terminate(wait = false)
  @stop = true
  return unless @thread
  @thread.value if wait
end