Class: Deimos::Message

Inherits:
Object
  • Object
show all
Defined in:
lib/deimos/message.rb

Overview

Basically a struct to hold the message as it’s processed.

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(payload, producer, topic: nil, key: nil, headers: nil, partition_key: nil) ⇒ Message

Returns a new instance of Message.

Parameters:

  • payload (Hash)
  • producer (Class)
  • topic (String) (defaults to: nil)
  • key (String, Integer, Hash) (defaults to: nil)
  • partition_key (Integer) (defaults to: nil)


28
29
30
31
32
33
34
35
# File 'lib/deimos/message.rb', line 28

def initialize(payload, producer, topic: nil, key: nil, headers: nil, partition_key: nil)
  @payload = payload&.with_indifferent_access
  @producer_name = producer&.name
  @topic = topic
  @key = key
  @headers = headers&.with_indifferent_access
  @partition_key = partition_key
end

Instance Attribute Details

#encoded_keyString

Returns:

  • (String)


15
16
17
# File 'lib/deimos/message.rb', line 15

def encoded_key
  @encoded_key
end

#encoded_payloadString

Returns:

  • (String)


17
18
19
# File 'lib/deimos/message.rb', line 17

def encoded_payload
  @encoded_payload
end

#headersHash

Returns:

  • (Hash)


11
12
13
# File 'lib/deimos/message.rb', line 11

def headers
  @headers
end

#keyHash, ...

Returns:

  • (Hash, String, Integer)


9
10
11
# File 'lib/deimos/message.rb', line 9

def key
  @key
end

#partition_keyInteger

Returns:

  • (Integer)


13
14
15
# File 'lib/deimos/message.rb', line 13

def partition_key
  @partition_key
end

#payloadHash

Returns:

  • (Hash)


7
8
9
# File 'lib/deimos/message.rb', line 7

def payload
  @payload
end

#producer_nameString

Returns:

  • (String)


21
22
23
# File 'lib/deimos/message.rb', line 21

def producer_name
  @producer_name
end

#topicString

Returns:

  • (String)


19
20
21
# File 'lib/deimos/message.rb', line 19

def topic
  @topic
end

Instance Method Details

#==(other) ⇒ Boolean

Parameters:

Returns:

  • (Boolean)


92
93
94
# File 'lib/deimos/message.rb', line 92

def ==(other)
  self.to_h == other.to_h
end

#add_fields(fields) ⇒ void

This method returns an undefined value.

Add message_id and timestamp default values if they are in the schema and don’t already have values.

Parameters:

  • fields (Array<String>)

    existing name fields in the schema.



41
42
43
44
45
46
47
48
49
50
# File 'lib/deimos/message.rb', line 41

def add_fields(fields)
  return if @payload.except(:payload_key, :partition_key).blank?

  if fields.include?('message_id')
    @payload['message_id'] ||= SecureRandom.uuid
  end
  if fields.include?('timestamp')
    @payload['timestamp'] ||= Time.now.in_time_zone.to_s
  end
end

#coerce_fields(encoder) ⇒ void

This method returns an undefined value.

Parameters:



54
55
56
57
58
# File 'lib/deimos/message.rb', line 54

def coerce_fields(encoder)
  return if payload.nil?

  @payload = encoder.coerce(@payload)
end

#encoded_hashHash

Returns:

  • (Hash)


61
62
63
64
65
66
67
68
69
70
71
72
73
# File 'lib/deimos/message.rb', line 61

def encoded_hash
  {
    topic: @topic,
    key: @encoded_key,
    headers: @headers,
    partition_key: @partition_key || @encoded_key,
    payload: @encoded_payload,
    metadata: {
      decoded_payload: @payload,
      producer_name: @producer_name
    }
  }.delete_if { |k, v| k == :headers && v.nil? }
end

#to_hHash

Returns:

  • (Hash)


76
77
78
79
80
81
82
83
84
85
86
87
88
# File 'lib/deimos/message.rb', line 76

def to_h
  {
    topic: @topic,
    key: @key,
    headers: @headers,
    partition_key: @partition_key || @key,
    payload: @payload,
    metadata: {
      decoded_payload: @payload,
      producer_name: @producer_name
    }
  }.delete_if { |k, v| k == :headers && v.nil? }
end

#tombstone?Boolean

Returns True if this message is a tombstone.

Returns:

  • (Boolean)

    True if this message is a tombstone



97
98
99
# File 'lib/deimos/message.rb', line 97

def tombstone?
  payload.nil?
end