Class: Pubsubstub::RedisPubSub
- Inherits:
- 
      Object
      
        - Object
- Pubsubstub::RedisPubSub
 
- Defined in:
- lib/pubsubstub/redis_pub_sub.rb
Constant Summary collapse
- EVENT_SCORE_THRESHOLD =
- 1000
- EXPIRE_THRESHOLD =
- 24 * 60 * 60 
Class Method Summary collapse
- .blocking_redis ⇒ Object
- .nonblocking_redis ⇒ Object
- .publish(channel_name, event) ⇒ Object
- .redis_url ⇒ Object
- .sub ⇒ Object
Instance Method Summary collapse
- 
  
    
      #initialize(channel_name)  ⇒ RedisPubSub 
    
    
  
  
  
    constructor
  
  
  
  
  
  
  
    A new instance of RedisPubSub. 
- #publish(event) ⇒ Object
- #scrollback(since_event_id) ⇒ Object
- #subscribe(callback) ⇒ Object
- #unsubscribe(callback) ⇒ Object
Constructor Details
#initialize(channel_name) ⇒ RedisPubSub
Returns a new instance of RedisPubSub.
| 6 7 8 | # File 'lib/pubsubstub/redis_pub_sub.rb', line 6 def initialize(channel_name) @channel_name = channel_name end | 
Class Method Details
.blocking_redis ⇒ Object
| 57 58 59 | # File 'lib/pubsubstub/redis_pub_sub.rb', line 57 def blocking_redis @blocking_redis ||= Redis.new(url: redis_url) end | 
.nonblocking_redis ⇒ Object
| 61 62 63 | # File 'lib/pubsubstub/redis_pub_sub.rb', line 61 def nonblocking_redis @nonblocking_redis ||= EM::Hiredis.connect(redis_url) end | 
.publish(channel_name, event) ⇒ Object
| 43 44 45 46 47 48 49 50 51 | # File 'lib/pubsubstub/redis_pub_sub.rb', line 43 def publish(channel_name, event) scrollback = "#{channel_name}.scrollback" blocking_redis.pipelined do blocking_redis.publish("#{channel_name}.pubsub", event.to_json) blocking_redis.zadd(scrollback, event.id, event.to_json) blocking_redis.zremrangebyrank(scrollback, 0, -EVENT_SCORE_THRESHOLD) blocking_redis.expire(scrollback, EXPIRE_THRESHOLD) end end | 
.redis_url ⇒ Object
| 65 66 67 | # File 'lib/pubsubstub/redis_pub_sub.rb', line 65 def redis_url ENV['REDIS_URL'] || "redis://localhost:6379" end | 
.sub ⇒ Object
| 53 54 55 | # File 'lib/pubsubstub/redis_pub_sub.rb', line 53 def sub @sub ||= nonblocking_redis.pubsub end | 
Instance Method Details
#publish(event) ⇒ Object
| 18 19 20 | # File 'lib/pubsubstub/redis_pub_sub.rb', line 18 def publish(event) self.class.publish(@channel_name, event) end | 
#scrollback(since_event_id) ⇒ Object
| 22 23 24 25 26 27 28 29 30 31 32 33 34 | # File 'lib/pubsubstub/redis_pub_sub.rb', line 22 def scrollback(since_event_id) redis = if EventMachine.reactor_running? self.class.nonblocking_redis else self.class.blocking_redis end redis.zrangebyscore(key('scrollback'), "(#{since_event_id.to_i}", '+inf') do |events| events.each do |json| yield Pubsubstub::Event.from_json(json) end end end | 
#subscribe(callback) ⇒ Object
| 10 11 12 | # File 'lib/pubsubstub/redis_pub_sub.rb', line 10 def subscribe(callback) self.class.sub.subscribe(key('pubsub'), callback) end | 
#unsubscribe(callback) ⇒ Object
| 14 15 16 | # File 'lib/pubsubstub/redis_pub_sub.rb', line 14 def unsubscribe(callback) self.class.sub.unsubscribe_proc(key('pubsub'), callback) end |