Class: Deimos::Consumer
- Inherits:
-
Object
- Object
- Deimos::Consumer
- Includes:
- SharedConfig, Phobos::Handler
- Defined in:
- lib/deimos/consumer.rb
Overview
Parent consumer class.
Direct Known Subclasses
Class Method Summary collapse
Instance Method Summary collapse
-
#around_consume(payload, metadata) ⇒ Object
:nodoc:.
-
#before_consume(payload, metadata) ⇒ Object
:nodoc:.
-
#consume(_payload, _metadata) ⇒ Object
Consume incoming messages.
-
#decode_key(key) ⇒ Object
Helper method to decode an Avro-encoded key.
Class Method Details
.decoder ⇒ AvroDataEncoder
20 21 22 23 |
# File 'lib/deimos/consumer.rb', line 20 def decoder @decoder ||= AvroDataDecoder.new(schema: config[:schema], namespace: config[:namespace]) end |
.key_decoder ⇒ AvroDataEncoder
26 27 28 29 |
# File 'lib/deimos/consumer.rb', line 26 def key_decoder @key_decoder ||= AvroDataDecoder.new(schema: config[:key_schema], namespace: config[:namespace]) end |
Instance Method Details
#around_consume(payload, metadata) ⇒ Object
:nodoc:
33 34 35 36 37 38 39 |
# File 'lib/deimos/consumer.rb', line 33 def around_consume(payload, ) (payload, ) benchmark = Benchmark.measure do _with_error_span(payload, ) { yield } end _handle_success(benchmark.real, payload, ) end |
#before_consume(payload, metadata) ⇒ Object
:nodoc:
42 43 44 45 46 47 48 49 |
# File 'lib/deimos/consumer.rb', line 42 def before_consume(payload, ) _with_error_span(payload, ) do if self.class.config[:key_schema] || self.class.config[:key_field] [:key] = decode_key([:key]) end self.class.decoder.decode(payload) if payload.present? end end |
#consume(_payload, _metadata) ⇒ Object
Consume incoming messages.
75 76 77 |
# File 'lib/deimos/consumer.rb', line 75 def consume(_payload, ) raise NotImplementedError end |
#decode_key(key) ⇒ Object
Helper method to decode an Avro-encoded key.
54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 |
# File 'lib/deimos/consumer.rb', line 54 def decode_key(key) return nil if key.nil? config = self.class.config if config[:encode_key] && config[:key_field].nil? && config[:key_schema].nil? raise 'No key config given - if you are not decoding keys, please use `key_config plain: true`' end if config[:key_field] self.class.decoder.decode_key(key, config[:key_field]) elsif config[:key_schema] self.class.key_decoder.decode(key, schema: config[:key_schema]) else # no encoding key end end |