Class: MessageBus::DistributedCache::Manager

Inherits:
Object
  • Object
show all
Defined in:
lib/message_bus/distributed_cache.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(message_bus = nil, publish_queue_in_memory: true) ⇒ Manager

Returns a new instance of Manager.



19
20
21
22
23
24
25
26
# File 'lib/message_bus/distributed_cache.rb', line 19

def initialize(message_bus = nil, publish_queue_in_memory: true)
  @subscribers = []
  @subscribed = false
  @lock = Mutex.new
  @message_bus = message_bus || MessageBus
  @publish_queue_in_memory = publish_queue_in_memory
  @app_version = nil
end

Instance Attribute Details

#app_versionObject

Returns the value of attribute app_version.



17
18
19
# File 'lib/message_bus/distributed_cache.rb', line 17

def app_version
  @app_version
end

Instance Method Details

#clear(hash) ⇒ Object



99
100
101
# File 'lib/message_bus/distributed_cache.rb', line 99

def clear(hash)
  publish(hash, op: :clear)
end

#delete(hash, key) ⇒ Object



95
96
97
# File 'lib/message_bus/distributed_cache.rb', line 95

def delete(hash, key)
  publish(hash, op: :delete, key: key)
end

#ensure_subscribe!Object



62
63
64
65
66
67
68
69
70
71
72
73
74
75
# File 'lib/message_bus/distributed_cache.rb', line 62

def ensure_subscribe!
  return if @subscribed

  @lock.synchronize do
    return if @subscribed

    @message_bus.subscribe(CHANNEL_NAME) do |message|
      @lock.synchronize do
        process_message(message)
      end
    end
    @subscribed = true
  end
end

#process_message(message) ⇒ Object



32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
# File 'lib/message_bus/distributed_cache.rb', line 32

def process_message(message)
  i = @subscribers.length - 1

  payload = message.data

  while i >= 0
    begin
      current = @subscribers[i]

      next if payload["origin"] == current.identity
      next if current.key != payload["hash_key"]

      next if @app_version && payload["app_version"] != @app_version

      hash = current.hash(message.site_id || DEFAULT_SITE_ID)

      case payload["op"]
      # TODO: consider custom marshal support with a restricted set
      when "set" then hash[payload["key"]] = payload["marshalled"] ? Marshal.load(Base64.decode64(payload["value"])) : payload["value"] # rubocop:disable Security/MarshalLoad
      when "delete" then hash.delete(payload["key"])
      when "clear"  then hash.clear
      end
    rescue WeakRef::RefError
      @subscribers.delete_at(i)
    ensure
      i -= 1
    end
  end
end

#publish(hash, message) ⇒ Object



77
78
79
80
81
82
83
84
85
86
# File 'lib/message_bus/distributed_cache.rb', line 77

def publish(hash, message)
  message[:origin] = hash.identity
  message[:hash_key] = hash.key
  message[:app_version] = @app_version if @app_version

  @message_bus.publish(CHANNEL_NAME, message,
    user_ids: [-1],
    queue_in_memory: @publish_queue_in_memory
  )
end

#register(hash) ⇒ Object



103
104
105
106
107
# File 'lib/message_bus/distributed_cache.rb', line 103

def register(hash)
  @lock.synchronize do
    @subscribers << WeakRef.new(hash)
  end
end

#set(hash, key, value) ⇒ Object



88
89
90
91
92
93
# File 'lib/message_bus/distributed_cache.rb', line 88

def set(hash, key, value)
  # special support for set
  marshal = (Set === value || Hash === value || Array === value)
  value = Base64.encode64(Marshal.dump(value)) if marshal
  publish(hash, op: :set, key: key, value: value, marshalled: marshal)
end

#subscribersObject



28
29
30
# File 'lib/message_bus/distributed_cache.rb', line 28

def subscribers
  @subscribers
end