Class: MessageBus::DistributedCache::Manager
- Inherits:
-
Object
- Object
- MessageBus::DistributedCache::Manager
- Defined in:
- lib/message_bus/distributed_cache.rb
Instance Attribute Summary collapse
-
#app_version ⇒ Object
Returns the value of attribute app_version.
Instance Method Summary collapse
- #clear(hash) ⇒ Object
- #delete(hash, key) ⇒ Object
- #ensure_subscribe! ⇒ Object
-
#initialize(message_bus = nil) ⇒ Manager
constructor
A new instance of Manager.
- #process_message(message) ⇒ Object
- #publish(hash, message) ⇒ Object
- #register(hash) ⇒ Object
- #set(hash, key, value) ⇒ Object
- #subscribers ⇒ Object
Constructor Details
#initialize(message_bus = nil) ⇒ Manager
Returns a new instance of Manager.
19 20 21 22 23 24 |
# File 'lib/message_bus/distributed_cache.rb', line 19 def initialize( = nil) @subscribers = [] @subscribed = false @lock = Mutex.new @message_bus = || MessageBus end |
Instance Attribute Details
#app_version ⇒ Object
Returns the value of attribute app_version.
17 18 19 |
# File 'lib/message_bus/distributed_cache.rb', line 17 def app_version @app_version end |
Instance Method Details
#clear(hash) ⇒ Object
92 93 94 |
# File 'lib/message_bus/distributed_cache.rb', line 92 def clear(hash) publish(hash, op: :clear) end |
#delete(hash, key) ⇒ Object
88 89 90 |
# File 'lib/message_bus/distributed_cache.rb', line 88 def delete(hash, key) publish(hash, op: :delete, key: key) end |
#ensure_subscribe! ⇒ Object
59 60 61 62 63 64 65 66 67 68 69 70 71 72 |
# File 'lib/message_bus/distributed_cache.rb', line 59 def ensure_subscribe! return if @subscribed @lock.synchronize do return if @subscribed @message_bus.subscribe(CHANNEL_NAME) do || @lock.synchronize do () end end @subscribed = true end end |
#process_message(message) ⇒ Object
30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 |
# File 'lib/message_bus/distributed_cache.rb', line 30 def () i = @subscribers.length - 1 payload = .data while i >= 0 begin current = @subscribers[i] next if payload["origin"] == current.identity next if current.key != payload["hash_key"] next if @app_version && payload["app_version"] != @app_version hash = current.hash(.site_id || DEFAULT_SITE_ID) case payload["op"] when "set" then hash[payload["key"]] = payload["marshalled"] ? Marshal.load(Base64.decode64(payload["value"])) : payload["value"] when "delete" then hash.delete(payload["key"]) when "clear" then hash.clear end rescue WeakRef::RefError @subscribers.delete_at(i) ensure i -= 1 end end end |
#publish(hash, message) ⇒ Object
74 75 76 77 78 79 |
# File 'lib/message_bus/distributed_cache.rb', line 74 def publish(hash, ) [:origin] = hash.identity [:hash_key] = hash.key [:app_version] = @app_version if @app_version @message_bus.publish(CHANNEL_NAME, , user_ids: [-1]) end |
#register(hash) ⇒ Object
96 97 98 99 100 |
# File 'lib/message_bus/distributed_cache.rb', line 96 def register(hash) @lock.synchronize do @subscribers << WeakRef.new(hash) end end |
#set(hash, key, value) ⇒ Object
81 82 83 84 85 86 |
# File 'lib/message_bus/distributed_cache.rb', line 81 def set(hash, key, value) # special support for set marshal = (Set === value || Hash === value || Array === value) value = Base64.encode64(Marshal.dump(value)) if marshal publish(hash, op: :set, key: key, value: value, marshalled: marshal) end |
#subscribers ⇒ Object
26 27 28 |
# File 'lib/message_bus/distributed_cache.rb', line 26 def subscribers @subscribers end |