Class: MessageBus::DistributedCache::Manager
- Inherits:
- 
      Object
      
        - Object
- MessageBus::DistributedCache::Manager
 
- Defined in:
- lib/message_bus/distributed_cache.rb
Instance Attribute Summary collapse
- 
  
    
      #app_version  ⇒ Object 
    
    
  
  
  
  
    
    
  
  
  
  
  
  
    Returns the value of attribute app_version. 
Instance Method Summary collapse
- #clear(hash) ⇒ Object
- #delete(hash, key) ⇒ Object
- #ensure_subscribe! ⇒ Object
- 
  
    
      #initialize(message_bus = nil)  ⇒ Manager 
    
    
  
  
  
    constructor
  
  
  
  
  
  
  
    A new instance of Manager. 
- #process_message(message) ⇒ Object
- #publish(hash, message) ⇒ Object
- #register(hash) ⇒ Object
- #set(hash, key, value) ⇒ Object
- #subscribers ⇒ Object
Constructor Details
#initialize(message_bus = nil) ⇒ Manager
Returns a new instance of Manager.
| 19 20 21 22 23 24 | # File 'lib/message_bus/distributed_cache.rb', line 19 def initialize( = nil) @subscribers = [] @subscribed = false @lock = Mutex.new @message_bus = || MessageBus end | 
Instance Attribute Details
#app_version ⇒ Object
Returns the value of attribute app_version.
| 17 18 19 | # File 'lib/message_bus/distributed_cache.rb', line 17 def app_version @app_version end | 
Instance Method Details
#clear(hash) ⇒ Object
| 92 93 94 | # File 'lib/message_bus/distributed_cache.rb', line 92 def clear(hash) publish(hash, op: :clear) end | 
#delete(hash, key) ⇒ Object
| 88 89 90 | # File 'lib/message_bus/distributed_cache.rb', line 88 def delete(hash, key) publish(hash, op: :delete, key: key) end | 
#ensure_subscribe! ⇒ Object
| 59 60 61 62 63 64 65 66 67 68 69 70 71 72 | # File 'lib/message_bus/distributed_cache.rb', line 59 def ensure_subscribe! return if @subscribed @lock.synchronize do return if @subscribed @message_bus.subscribe(CHANNEL_NAME) do || @lock.synchronize do () end end @subscribed = true end end | 
#process_message(message) ⇒ Object
| 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 | # File 'lib/message_bus/distributed_cache.rb', line 30 def () i = @subscribers.length - 1 payload = .data while i >= 0 begin current = @subscribers[i] next if payload["origin"] == current.identity next if current.key != payload["hash_key"] next if @app_version && payload["app_version"] != @app_version hash = current.hash(.site_id || DEFAULT_SITE_ID) case payload["op"] when "set" then hash[payload["key"]] = payload["marshalled"] ? Marshal.load(Base64.decode64(payload["value"])) : payload["value"] when "delete" then hash.delete(payload["key"]) when "clear" then hash.clear end rescue WeakRef::RefError @subscribers.delete_at(i) ensure i -= 1 end end end | 
#publish(hash, message) ⇒ Object
| 74 75 76 77 78 79 | # File 'lib/message_bus/distributed_cache.rb', line 74 def publish(hash, ) [:origin] = hash.identity [:hash_key] = hash.key [:app_version] = @app_version if @app_version @message_bus.publish(CHANNEL_NAME, , user_ids: [-1]) end | 
#register(hash) ⇒ Object
| 96 97 98 99 100 | # File 'lib/message_bus/distributed_cache.rb', line 96 def register(hash) @lock.synchronize do @subscribers << WeakRef.new(hash) end end | 
#set(hash, key, value) ⇒ Object
| 81 82 83 84 85 86 | # File 'lib/message_bus/distributed_cache.rb', line 81 def set(hash, key, value) # special support for set marshal = (Set === value || Hash === value || Array === value) value = Base64.encode64(Marshal.dump(value)) if marshal publish(hash, op: :set, key: key, value: value, marshalled: marshal) end | 
#subscribers ⇒ Object
| 26 27 28 | # File 'lib/message_bus/distributed_cache.rb', line 26 def subscribers @subscribers end |