Class: RightAMQP::HABrokerClient::Published

Inherits:
Object
  • Object
show all
Defined in:
lib/right_amqp/ha_client/ha_broker_client.rb

Overview

Cache for context of recently published messages for use with message returns Applies LRU for managing cache size but only deletes entries when old enough

Constant Summary collapse

MAX_AGE =

Number of seconds since a cache entry was last used before it is deleted

60

Instance Method Summary collapse

Constructor Details

#initializePublished

Initialize cache



1107
1108
1109
1110
# File 'lib/right_amqp/ha_client/ha_broker_client.rb', line 1107

def initialize
  @cache = {}
  @lru = []
end

Instance Method Details

#fetch(message) ⇒ Object

Fetch context of previously published message

Parameters

message(String)

Serialized message that was published

Return

(Context|nil)

Context of message, or nil if not found in cache



1141
1142
1143
1144
1145
1146
1147
1148
# File 'lib/right_amqp/ha_client/ha_broker_client.rb', line 1141

def fetch(message)
  key = identify(message)
  if entry = @cache[key]
    entry[0] = Time.now.to_i
    @lru.push(@lru.delete(key))
    entry[1]
  end
end

#identify(message) ⇒ Object

Obtain a unique identifier for this message

Parameters

message(String)

Serialized message that was published

Returns

(String)

Unique id for message



1157
1158
1159
# File 'lib/right_amqp/ha_client/ha_broker_client.rb', line 1157

def identify(message)
  Digest::MD5.hexdigest(message)
end

#store(message, context) ⇒ Object

Store message context in cache

Parameters

message(String)

Serialized message that was published

context(Context)

Message publishing context

Return

true

Always return true



1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
# File 'lib/right_amqp/ha_client/ha_broker_client.rb', line 1120

def store(message, context)
  key = identify(message)
  now = Time.now.to_i
  if entry = @cache[key]
    entry[0] = now
    @lru.push(@lru.delete(key))
  else
    @cache[key] = [now, context]
    @lru.push(key)
    @cache.delete(@lru.shift) while (now - @cache[@lru.first][0]) > MAX_AGE
  end
  true
end