Class: MarchHare::BaseConsumer

Inherits:
DefaultConsumer
  • Object
show all
Defined in:
lib/march_hare/consumers/base.rb

Direct Known Subclasses

CallbackConsumer

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(channel, queue, opts) ⇒ BaseConsumer

Returns a new instance of BaseConsumer.



10
11
12
13
14
15
16
17
18
19
20
21
# File 'lib/march_hare/consumers/base.rb', line 10

def initialize(channel, queue, opts)
  super(channel)
  @channel    = channel
  @queue      = queue
  @opts       = opts
  @auto_ack   = true

  @cancelling = JavaConcurrent::AtomicBoolean.new
  @cancelled  = JavaConcurrent::AtomicBoolean.new

  @terminated = JavaConcurrent::AtomicBoolean.new
end

Instance Attribute Details

#auto_ackObject

Returns the value of attribute auto_ack.



8
9
10
# File 'lib/march_hare/consumers/base.rb', line 8

def auto_ack
  @auto_ack
end

#consumer_tagObject

Returns the value of attribute consumer_tag.



7
8
9
# File 'lib/march_hare/consumers/base.rb', line 7

def consumer_tag
  @consumer_tag
end

Instance Method Details

#active?Boolean

Returns:

  • (Boolean)


75
76
77
# File 'lib/march_hare/consumers/base.rb', line 75

def active?
  !terminated?
end

#cancelled?Boolean

Returns:

  • (Boolean)


71
72
73
# File 'lib/march_hare/consumers/base.rb', line 71

def cancelled?
  @cancelling.get || @cancelled.get
end

#deliver(headers, message) ⇒ Object

Raises:

  • (NotImplementedError)


67
68
69
# File 'lib/march_hare/consumers/base.rb', line 67

def deliver(headers, message)
  raise NotImplementedError, 'To be implemented by a subclass'
end

#gracefully_shut_downObject



63
64
65
# File 'lib/march_hare/consumers/base.rb', line 63

def gracefully_shut_down
  # no-op
end

#handleCancel(consumer_tag) ⇒ Object



30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
# File 'lib/march_hare/consumers/base.rb', line 30

def handleCancel(consumer_tag)
  @cancelled.set(true)
  @channel.unregister_consumer(consumer_tag)

  if f = @opts[:on_cancellation]
    case f.arity
    when 0 then
      f.call
    when 1 then
      f.call(self)
    when 2 then
      f.call(@channel, self)
    when 3 then
      f.call(@channel, self, consumer_tag)
    else
      f.call(@channel, self, consumer_tag)
    end
  end

  @terminated.set(true)
end

#handleCancelOk(consumer_tag) ⇒ Object



52
53
54
55
56
57
# File 'lib/march_hare/consumers/base.rb', line 52

def handleCancelOk(consumer_tag)
  @cancelled.set(true)
  @channel.unregister_consumer(consumer_tag)

  @terminated.set(true)
end

#handleDelivery(consumer_tag, envelope, properties, bytes) ⇒ Object



23
24
25
26
27
28
# File 'lib/march_hare/consumers/base.rb', line 23

def handleDelivery(consumer_tag, envelope, properties, bytes)
  body    = String.from_java_bytes(bytes)
  headers = Headers.new(channel, consumer_tag, envelope, properties)

  deliver(headers, body)
end

#recover_from_network_failureObject



84
85
86
87
88
89
90
# File 'lib/march_hare/consumers/base.rb', line 84

def recover_from_network_failure
  @terminated.set(false)
  @cancelled.set(false)
  @consumer_tag = @channel.basic_consume(@queue.name, @auto_ack, @consumer_tag, self)

  @consumer_tag
end

#startObject



59
60
61
# File 'lib/march_hare/consumers/base.rb', line 59

def start
  # no-op
end

#terminated?Boolean

Returns:

  • (Boolean)


79
80
81
# File 'lib/march_hare/consumers/base.rb', line 79

def terminated?
  @terminated.get
end