Class: Liebre::Actor::Consumer
- Inherits:
-
Object
- Object
- Liebre::Actor::Consumer
show all
- Includes:
- Concurrent::Async
- Defined in:
- lib/liebre/actor/consumer.rb,
lib/liebre/actor/consumer/core.rb,
lib/liebre/actor/consumer/callback.rb,
lib/liebre/actor/consumer/reporter.rb,
lib/liebre/actor/consumer/resources.rb,
lib/liebre/actor/consumer/resources/config.rb
Defined Under Namespace
Classes: Callback, Core, Reporter, Resources
Constant Summary
collapse
- OPTS =
{:block => false, :manual_ack => true}
Instance Method Summary
collapse
-
#__ack__(info, opts) ⇒ Object
-
#__clean__ ⇒ Object
-
#__consume__(info, meta, payload) ⇒ Object
-
#__failed__(info, error) ⇒ Object
-
#__nack__(info, opts) ⇒ Object
-
#__reject__(info, opts) ⇒ Object
-
#__start__ ⇒ Object
-
#__stop__ ⇒ Object
-
#ack(info, opts = {}) ⇒ Object
-
#clean ⇒ Object
-
#consume(info, meta, payload) ⇒ Object
-
#failed(info, error) ⇒ Object
-
#initialize(context) ⇒ Consumer
constructor
A new instance of Consumer.
-
#nack(info, opts = {}) ⇒ Object
-
#reject(info, opts = {}) ⇒ Object
-
#start ⇒ Object
-
#stop ⇒ Object
Constructor Details
#initialize(context) ⇒ Consumer
Returns a new instance of Consumer.
15
16
17
18
19
|
# File 'lib/liebre/actor/consumer.rb', line 15
def initialize context
super()
@context = context
end
|
Instance Method Details
#__ack__(info, opts) ⇒ Object
44
45
46
|
# File 'lib/liebre/actor/consumer.rb', line 44
def __ack__(info, opts)
reporter.on_ack { core.ack(info, opts) }
end
|
#__clean__ ⇒ Object
58
59
60
|
# File 'lib/liebre/actor/consumer.rb', line 58
def __clean__
reporter.on_clean { core.clean() }
end
|
#__consume__(info, meta, payload) ⇒ Object
40
41
42
|
# File 'lib/liebre/actor/consumer.rb', line 40
def __consume__ info, meta, payload
reporter.on_consume { core.consume(info, meta, payload) }
end
|
#__failed__(info, error) ⇒ Object
54
55
56
|
# File 'lib/liebre/actor/consumer.rb', line 54
def __failed__(info, error)
reporter.on_failed(error) { core.failed(info, error) }
end
|
#__nack__(info, opts) ⇒ Object
47
48
49
|
# File 'lib/liebre/actor/consumer.rb', line 47
def __nack__(info, opts)
reporter.on_nack { core.nack(info, opts) }
end
|
#__reject__(info, opts) ⇒ Object
50
51
52
|
# File 'lib/liebre/actor/consumer.rb', line 50
def __reject__(info, opts)
reporter.on_reject { core.reject(info, opts) }
end
|
#__start__ ⇒ Object
33
34
35
|
# File 'lib/liebre/actor/consumer.rb', line 33
def __start__
reporter.on_start { core.start }
end
|
#__stop__ ⇒ Object
36
37
38
|
# File 'lib/liebre/actor/consumer.rb', line 36
def __stop__
reporter.on_stop { core.stop }
end
|
#ack(info, opts = {}) ⇒ Object
26
|
# File 'lib/liebre/actor/consumer.rb', line 26
def ack(info, opts = {}) async.__ack__(info, opts); end
|
#clean ⇒ Object
31
|
# File 'lib/liebre/actor/consumer.rb', line 31
def clean() async.__clean__(); end
|
#consume(info, meta, payload) ⇒ Object
24
|
# File 'lib/liebre/actor/consumer.rb', line 24
def consume(info, meta, payload) async.__consume__(info, meta, payload); end
|
#failed(info, error) ⇒ Object
29
|
# File 'lib/liebre/actor/consumer.rb', line 29
def failed(info, error) async.__failed__(info, error); end
|
#nack(info, opts = {}) ⇒ Object
27
|
# File 'lib/liebre/actor/consumer.rb', line 27
def nack(info, opts = {}) async.__nack__(info, opts); end
|
#reject(info, opts = {}) ⇒ Object
28
|
# File 'lib/liebre/actor/consumer.rb', line 28
def reject(info, opts = {}) async.__reject__(info, opts); end
|
#start ⇒ Object
21
|
# File 'lib/liebre/actor/consumer.rb', line 21
def start() async.__start__(); end
|
#stop ⇒ Object
22
|
# File 'lib/liebre/actor/consumer.rb', line 22
def stop() async.__stop__(); end
|