Class: MessageBus::DistributedCache::Manager
- Inherits:
-
Object
- Object
- MessageBus::DistributedCache::Manager
- Defined in:
- lib/message_bus/distributed_cache.rb
Instance Attribute Summary collapse
-
#app_version ⇒ Object
Returns the value of attribute app_version.
Instance Method Summary collapse
- #clear(hash) ⇒ Object
- #delete(hash, key) ⇒ Object
- #ensure_subscribe! ⇒ Object
-
#initialize(message_bus = nil, publish_queue_in_memory: true) ⇒ Manager
constructor
A new instance of Manager.
- #process_message(message) ⇒ Object
- #publish(hash, message) ⇒ Object
- #register(hash) ⇒ Object
- #set(hash, key, value) ⇒ Object
- #subscribers ⇒ Object
Constructor Details
#initialize(message_bus = nil, publish_queue_in_memory: true) ⇒ Manager
Returns a new instance of Manager.
19 20 21 22 23 24 25 |
# File 'lib/message_bus/distributed_cache.rb', line 19 def initialize( = nil, publish_queue_in_memory: true) @subscribers = [] @subscribed = false @lock = Mutex.new @message_bus = || MessageBus @publish_queue_in_memory = publish_queue_in_memory end |
Instance Attribute Details
#app_version ⇒ Object
Returns the value of attribute app_version.
17 18 19 |
# File 'lib/message_bus/distributed_cache.rb', line 17 def app_version @app_version end |
Instance Method Details
#clear(hash) ⇒ Object
97 98 99 |
# File 'lib/message_bus/distributed_cache.rb', line 97 def clear(hash) publish(hash, op: :clear) end |
#delete(hash, key) ⇒ Object
93 94 95 |
# File 'lib/message_bus/distributed_cache.rb', line 93 def delete(hash, key) publish(hash, op: :delete, key: key) end |
#ensure_subscribe! ⇒ Object
60 61 62 63 64 65 66 67 68 69 70 71 72 73 |
# File 'lib/message_bus/distributed_cache.rb', line 60 def ensure_subscribe! return if @subscribed @lock.synchronize do return if @subscribed @message_bus.subscribe(CHANNEL_NAME) do || @lock.synchronize do () end end @subscribed = true end end |
#process_message(message) ⇒ Object
31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 |
# File 'lib/message_bus/distributed_cache.rb', line 31 def () i = @subscribers.length - 1 payload = .data while i >= 0 begin current = @subscribers[i] next if payload["origin"] == current.identity next if current.key != payload["hash_key"] next if @app_version && payload["app_version"] != @app_version hash = current.hash(.site_id || DEFAULT_SITE_ID) case payload["op"] when "set" then hash[payload["key"]] = payload["marshalled"] ? Marshal.load(Base64.decode64(payload["value"])) : payload["value"] when "delete" then hash.delete(payload["key"]) when "clear" then hash.clear end rescue WeakRef::RefError @subscribers.delete_at(i) ensure i -= 1 end end end |
#publish(hash, message) ⇒ Object
75 76 77 78 79 80 81 82 83 84 |
# File 'lib/message_bus/distributed_cache.rb', line 75 def publish(hash, ) [:origin] = hash.identity [:hash_key] = hash.key [:app_version] = @app_version if @app_version @message_bus.publish(CHANNEL_NAME, , user_ids: [-1], queue_in_memory: @publish_queue_in_memory ) end |
#register(hash) ⇒ Object
101 102 103 104 105 |
# File 'lib/message_bus/distributed_cache.rb', line 101 def register(hash) @lock.synchronize do @subscribers << WeakRef.new(hash) end end |
#set(hash, key, value) ⇒ Object
86 87 88 89 90 91 |
# File 'lib/message_bus/distributed_cache.rb', line 86 def set(hash, key, value) # special support for set marshal = (Set === value || Hash === value || Array === value) value = Base64.encode64(Marshal.dump(value)) if marshal publish(hash, op: :set, key: key, value: value, marshalled: marshal) end |
#subscribers ⇒ Object
27 28 29 |
# File 'lib/message_bus/distributed_cache.rb', line 27 def subscribers @subscribers end |