Class: RightAMQP::HABrokerClient::Published

Inherits:
Object
  • Object
show all
Defined in:
lib/right_amqp/ha_client/ha_broker_client.rb

Overview

Cache for context of recently published messages for use with message returns Applies LRU for managing cache size but only deletes entries when old enough

Constant Summary collapse

MAX_AGE =

Number of seconds since a cache entry was last used before it is deleted

60

Instance Method Summary collapse

Constructor Details

#initializePublished

Initialize cache



1147
1148
1149
1150
# File 'lib/right_amqp/ha_client/ha_broker_client.rb', line 1147

def initialize
  @cache = {}
  @lru = []
end

Instance Method Details

#fetch(message) ⇒ Object

Fetch context of previously published message

Parameters

message(String)

Serialized message that was published

Return

(Context|nil)

Context of message, or nil if not found in cache



1181
1182
1183
1184
1185
1186
1187
1188
# File 'lib/right_amqp/ha_client/ha_broker_client.rb', line 1181

def fetch(message)
  key = identify(message)
  if entry = @cache[key]
    entry[0] = Time.now.to_i
    @lru.push(@lru.delete(key))
    entry[1]
  end
end

#identify(message) ⇒ Object

Obtain a unique identifier for this message

Parameters

message(String)

Serialized message that was published

Returns

(String)

Unique id for message



1197
1198
1199
# File 'lib/right_amqp/ha_client/ha_broker_client.rb', line 1197

def identify(message)
  Digest::MD5.hexdigest(message)
end

#store(message, context) ⇒ Object

Store message context in cache

Parameters

message(String)

Serialized message that was published

context(Context)

Message publishing context

Return

true

Always return true



1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
# File 'lib/right_amqp/ha_client/ha_broker_client.rb', line 1160

def store(message, context)
  key = identify(message)
  now = Time.now.to_i
  if entry = @cache[key]
    entry[0] = now
    @lru.push(@lru.delete(key))
  else
    @cache[key] = [now, context]
    @lru.push(key)
    @cache.delete(@lru.shift) while (now - @cache[@lru.first][0]) > MAX_AGE
  end
  true
end