Class: Deimos::Consumer

Inherits:
Object
  • Object
show all
Includes:
SharedConfig, Phobos::Handler
Defined in:
lib/deimos/consumer.rb

Overview

Parent consumer class.

Class Method Summary collapse

Instance Method Summary collapse

Class Method Details

.decoderAvroDataEncoder

Returns:



20
21
22
23
# File 'lib/deimos/consumer.rb', line 20

def decoder
  @decoder ||= AvroDataDecoder.new(schema: config[:schema],
                                   namespace: config[:namespace])
end

.key_decoderAvroDataEncoder

Returns:



26
27
28
29
# File 'lib/deimos/consumer.rb', line 26

def key_decoder
  @key_decoder ||= AvroDataDecoder.new(schema: config[:key_schema],
                                       namespace: config[:namespace])
end

Instance Method Details

#around_consume(payload, metadata) ⇒ Object

:nodoc:



33
34
35
36
37
38
39
# File 'lib/deimos/consumer.rb', line 33

def around_consume(payload, )
  _received_message(payload, )
  benchmark = Benchmark.measure do
    _with_error_span(payload, ) { yield }
  end
  _handle_success(benchmark.real, payload, )
end

#before_consume(payload, metadata) ⇒ Object

:nodoc:



42
43
44
45
46
47
48
49
# File 'lib/deimos/consumer.rb', line 42

def before_consume(payload, )
  _with_error_span(payload, ) do
    if self.class.config[:key_schema] || self.class.config[:key_field]
      [:key] = decode_key([:key])
    end
    self.class.decoder.decode(payload) if payload.present?
  end
end

#consume(_payload, _metadata) ⇒ Object

Consume incoming messages.

Parameters:

  • _payload (String)
  • _metadata (Hash)

Raises:

  • (NotImplementedError)


75
76
77
# File 'lib/deimos/consumer.rb', line 75

def consume(_payload, )
  raise NotImplementedError
end

#decode_key(key) ⇒ Object

Helper method to decode an Avro-encoded key.

Parameters:

  • key (String)

Returns:

  • (Object)

    the decoded key.



54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
# File 'lib/deimos/consumer.rb', line 54

def decode_key(key)
  return nil if key.nil?

  config = self.class.config
  if config[:encode_key] && config[:key_field].nil? &&
     config[:key_schema].nil?
    raise 'No key config given - if you are not decoding keys, please use `key_config plain: true`'
  end

  if config[:key_field]
    self.class.decoder.decode_key(key, config[:key_field])
  elsif config[:key_schema]
    self.class.key_decoder.decode(key, schema: config[:key_schema])
  else # no encoding
    key
  end
end