Class: MessageBus::DistributedCache::Manager

Inherits:
Object
  • Object
show all
Defined in:
lib/message_bus/distributed_cache.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(message_bus = nil, publish_queue_in_memory: true) ⇒ Manager

Returns a new instance of Manager.



19
20
21
22
23
24
25
# File 'lib/message_bus/distributed_cache.rb', line 19

def initialize(message_bus = nil, publish_queue_in_memory: true)
  @subscribers = []
  @subscribed = false
  @lock = Mutex.new
  @message_bus = message_bus || MessageBus
  @publish_queue_in_memory = publish_queue_in_memory
end

Instance Attribute Details

#app_versionObject

Returns the value of attribute app_version.



17
18
19
# File 'lib/message_bus/distributed_cache.rb', line 17

def app_version
  @app_version
end

Instance Method Details

#clear(hash) ⇒ Object



97
98
99
# File 'lib/message_bus/distributed_cache.rb', line 97

def clear(hash)
  publish(hash, op: :clear)
end

#delete(hash, key) ⇒ Object



93
94
95
# File 'lib/message_bus/distributed_cache.rb', line 93

def delete(hash, key)
  publish(hash, op: :delete, key: key)
end

#ensure_subscribe!Object



60
61
62
63
64
65
66
67
68
69
70
71
72
73
# File 'lib/message_bus/distributed_cache.rb', line 60

def ensure_subscribe!
  return if @subscribed

  @lock.synchronize do
    return if @subscribed

    @message_bus.subscribe(CHANNEL_NAME) do |message|
      @lock.synchronize do
        process_message(message)
      end
    end
    @subscribed = true
  end
end

#process_message(message) ⇒ Object



31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
# File 'lib/message_bus/distributed_cache.rb', line 31

def process_message(message)
  i = @subscribers.length - 1

  payload = message.data

  while i >= 0
    begin
      current = @subscribers[i]

      next if payload["origin"] == current.identity
      next if current.key != payload["hash_key"]

      next if @app_version && payload["app_version"] != @app_version

      hash = current.hash(message.site_id || DEFAULT_SITE_ID)

      case payload["op"]
      when "set" then hash[payload["key"]] = payload["marshalled"] ? Marshal.load(Base64.decode64(payload["value"])) : payload["value"]
      when "delete" then hash.delete(payload["key"])
      when "clear"  then hash.clear
      end
    rescue WeakRef::RefError
      @subscribers.delete_at(i)
    ensure
      i -= 1
    end
  end
end

#publish(hash, message) ⇒ Object



75
76
77
78
79
80
81
82
83
84
# File 'lib/message_bus/distributed_cache.rb', line 75

def publish(hash, message)
  message[:origin] = hash.identity
  message[:hash_key] = hash.key
  message[:app_version] = @app_version if @app_version

  @message_bus.publish(CHANNEL_NAME, message,
    user_ids: [-1],
    queue_in_memory: @publish_queue_in_memory
  )
end

#register(hash) ⇒ Object



101
102
103
104
105
# File 'lib/message_bus/distributed_cache.rb', line 101

def register(hash)
  @lock.synchronize do
    @subscribers << WeakRef.new(hash)
  end
end

#set(hash, key, value) ⇒ Object



86
87
88
89
90
91
# File 'lib/message_bus/distributed_cache.rb', line 86

def set(hash, key, value)
  # special support for set
  marshal = (Set === value || Hash === value || Array === value)
  value = Base64.encode64(Marshal.dump(value)) if marshal
  publish(hash, op: :set, key: key, value: value, marshalled: marshal)
end

#subscribersObject



27
28
29
# File 'lib/message_bus/distributed_cache.rb', line 27

def subscribers
  @subscribers
end