Class: Chore::Consumer

Inherits:
Object
  • Object
show all
Defined in:
lib/chore/consumer.rb

Overview

Base class for a Chore Consumer. Provides the interface that a Chore::Consumer implementation should adhere to.

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(queue_name, opts = {}) ⇒ Consumer



15
16
17
18
# File 'lib/chore/consumer.rb', line 15

def initialize(queue_name, opts={})
  @queue_name = queue_name
  @running = true
end

Instance Attribute Details

#queue_nameObject

Returns the value of attribute queue_name.



11
12
13
# File 'lib/chore/consumer.rb', line 11

def queue_name
  @queue_name
end

Class Method Details

.reset_connection!Object

Causes the underlying connection for all consumers of this class to be reset. Useful for the case where the consumer is being used across a fork. Should be overriden in consumers (but is not required).



22
23
# File 'lib/chore/consumer.rb', line 22

def self.reset_connection!
end

Instance Method Details

#complete(message_id, receipt_handle) ⇒ Object

Complete should mark a message as finished.

Raises:

  • (NotImplementedError)


46
47
48
# File 'lib/chore/consumer.rb', line 46

def complete(message_id, receipt_handle)
  raise NotImplementedError
end

#consume(&handler) ⇒ Object

Consume takes a block with an arity of two. The two params are |message_id,message_body| where message_id is any object that the consumer will need to be able to act on a message later (reject, complete, etc)

Raises:

  • (NotImplementedError)


30
31
32
# File 'lib/chore/consumer.rb', line 30

def consume(&handler)
  raise NotImplementedError
end

#dupe_detectorDuplicateDetector

Instance of duplicate detection implementation class



83
84
85
86
# File 'lib/chore/consumer.rb', line 83

def dupe_detector
  @dupes ||= DuplicateDetector.new({:servers => Chore.config.dedupe_servers,
                                    :dupe_on_cache_failure => false})
end

#duplicate_message?(dedupe_key, klass, queue_timeout) ⇒ TrueClass, FalseClass

Determine whether or not we have already seen this message



76
77
78
# File 'lib/chore/consumer.rb', line 76

def duplicate_message?(dedupe_key, klass, queue_timeout)
  dupe_detector.found_duplicate?(:id=>dedupe_key, :queue=>klass.to_s, :visibility_timeout=>queue_timeout)
end

#provide_work(n) ⇒ Object

Returns up to n work

Raises:

  • (NotImplementedError)


65
66
67
# File 'lib/chore/consumer.rb', line 65

def provide_work(n)
  raise NotImplementedError
end

#reject(message_id) ⇒ Object

Reject should put a message back on a queue to be processed again later. It takes a message_id as returned via consume.

Raises:

  • (NotImplementedError)


38
39
40
# File 'lib/chore/consumer.rb', line 38

def reject(message_id)
  raise NotImplementedError
end

#running?TrueClass, FalseClass

Returns true if the Consumer is currently running



58
59
60
# File 'lib/chore/consumer.rb', line 58

def running?
  @running
end

#stopObject

Perform any shutdown behavior and stop consuming messages



51
52
53
# File 'lib/chore/consumer.rb', line 51

def stop
  @running = false
end