Class: Liebre::Actor::Consumer

Inherits:
Object
  • Object
show all
Includes:
Concurrent::Async
Defined in:
lib/liebre/actor/consumer.rb,
lib/liebre/actor/consumer/core.rb,
lib/liebre/actor/consumer/callback.rb,
lib/liebre/actor/consumer/reporter.rb,
lib/liebre/actor/consumer/resources.rb,
lib/liebre/actor/consumer/resources/config.rb

Defined Under Namespace

Classes: Callback, Core, Reporter, Resources

Constant Summary collapse

OPTS =
{:block => false, :manual_ack => true}

Instance Method Summary collapse

Constructor Details

#initialize(context) ⇒ Consumer

Returns a new instance of Consumer.



15
16
17
18
19
# File 'lib/liebre/actor/consumer.rb', line 15

def initialize context
  super()

  @context = context
end

Instance Method Details

#__ack__(info, opts) ⇒ Object



44
45
46
# File 'lib/liebre/actor/consumer.rb', line 44

def __ack__(info, opts)
  reporter.on_ack { core.ack(info, opts) }
end

#__clean__Object



58
59
60
# File 'lib/liebre/actor/consumer.rb', line 58

def __clean__
  reporter.on_clean { core.clean() }
end

#__consume__(info, meta, payload) ⇒ Object



40
41
42
# File 'lib/liebre/actor/consumer.rb', line 40

def __consume__ info, meta, payload
  reporter.on_consume { core.consume(info, meta, payload) }
end

#__failed__(info, error) ⇒ Object



54
55
56
# File 'lib/liebre/actor/consumer.rb', line 54

def __failed__(info, error)
  reporter.on_failed(error) { core.failed(info, error) }
end

#__nack__(info, opts) ⇒ Object



47
48
49
# File 'lib/liebre/actor/consumer.rb', line 47

def __nack__(info, opts)
  reporter.on_nack { core.nack(info, opts) }
end

#__reject__(info, opts) ⇒ Object



50
51
52
# File 'lib/liebre/actor/consumer.rb', line 50

def __reject__(info, opts)
  reporter.on_reject { core.reject(info, opts) }
end

#__start__Object



33
34
35
# File 'lib/liebre/actor/consumer.rb', line 33

def __start__
  reporter.on_start { core.start }
end

#__stop__Object



36
37
38
# File 'lib/liebre/actor/consumer.rb', line 36

def __stop__
  reporter.on_stop { core.stop }
end

#ack(info, opts = {}) ⇒ Object



26
# File 'lib/liebre/actor/consumer.rb', line 26

def ack(info, opts = {})    async.__ack__(info, opts);     end

#cleanObject



31
# File 'lib/liebre/actor/consumer.rb', line 31

def clean() async.__clean__(); end

#consume(info, meta, payload) ⇒ Object



24
# File 'lib/liebre/actor/consumer.rb', line 24

def consume(info, meta, payload) async.__consume__(info, meta, payload); end

#failed(info, error) ⇒ Object



29
# File 'lib/liebre/actor/consumer.rb', line 29

def failed(info, error)     async.__failed__(info, error); end

#nack(info, opts = {}) ⇒ Object



27
# File 'lib/liebre/actor/consumer.rb', line 27

def nack(info, opts = {})   async.__nack__(info, opts);    end

#reject(info, opts = {}) ⇒ Object



28
# File 'lib/liebre/actor/consumer.rb', line 28

def reject(info, opts = {}) async.__reject__(info, opts);  end

#startObject



21
# File 'lib/liebre/actor/consumer.rb', line 21

def start() async.__start__(); end

#stopObject



22
# File 'lib/liebre/actor/consumer.rb', line 22

def stop()  async.__stop__();  end