Class: Liebre::Actor::Consumer::Core

Inherits:
Object
  • Object
show all
Defined in:
lib/liebre/actor/consumer/core.rb

Constant Summary collapse

OPTS =
{:block => false, :manual_ack => true}

Instance Method Summary collapse

Constructor Details

#initialize(consumer, resources, context, callback_class) ⇒ Core

Returns a new instance of Core.



8
9
10
11
12
13
# File 'lib/liebre/actor/consumer/core.rb', line 8

def initialize consumer, resources, context, callback_class
  @consumer       = consumer
  @resources      = resources
  @context        = context
  @callback_class = callback_class
end

Instance Method Details

#ack(info, opts) ⇒ Object



34
# File 'lib/liebre/actor/consumer/core.rb', line 34

def ack(info, opts)    queue.ack(info, opts);    end

#cleanObject



42
43
44
45
46
47
# File 'lib/liebre/actor/consumer/core.rb', line 42

def clean
  queue.delete
  exchange.delete
  dead_queue.delete
  dead_exchange.delete
end

#consume(info, meta, payload) ⇒ Object



26
27
28
29
30
31
32
# File 'lib/liebre/actor/consumer/core.rb', line 26

def consume info, meta, payload
  callback = callback_class.new(consumer, info)

  handler.call(payload, meta, callback) do |error|
    callback.failed(error)
  end
end

#failed(info, error) ⇒ Object



38
39
40
# File 'lib/liebre/actor/consumer/core.rb', line 38

def failed info, error
  queue.reject(info, {})
end

#nack(info, opts) ⇒ Object



35
# File 'lib/liebre/actor/consumer/core.rb', line 35

def nack(info, opts)   queue.nack(info, opts);   end

#reject(info, opts) ⇒ Object



36
# File 'lib/liebre/actor/consumer/core.rb', line 36

def reject(info, opts) queue.reject(info, opts); end

#startObject



15
16
17
18
19
20
# File 'lib/liebre/actor/consumer/core.rb', line 15

def start
  queue.subscribe(OPTS) do |info, meta, payload|
    consumer.consume(info, meta, payload)
  end
  dead_queue
end

#stopObject



22
23
24
# File 'lib/liebre/actor/consumer/core.rb', line 22

def stop
  chan.close
end