Class: Pubsubstub::RedisPubSub

Inherits:
Object
  • Object
show all
Defined in:
lib/pubsubstub/redis_pub_sub.rb

Constant Summary collapse

EVENT_SCORE_THRESHOLD =
1000
EXPIRE_THRESHOLD =
24 * 60 * 60

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(channel_name) ⇒ RedisPubSub

Returns a new instance of RedisPubSub.



6
7
8
# File 'lib/pubsubstub/redis_pub_sub.rb', line 6

def initialize(channel_name)
  @channel_name = channel_name
end

Class Method Details

.blocking_redisObject



57
58
59
# File 'lib/pubsubstub/redis_pub_sub.rb', line 57

def blocking_redis
  @blocking_redis ||= Redis.new(url: redis_url)
end

.nonblocking_redisObject



61
62
63
# File 'lib/pubsubstub/redis_pub_sub.rb', line 61

def nonblocking_redis
  @nonblocking_redis ||= EM::Hiredis.connect(redis_url)
end

.publish(channel_name, event) ⇒ Object



43
44
45
46
47
48
49
50
51
# File 'lib/pubsubstub/redis_pub_sub.rb', line 43

def publish(channel_name, event)
  scrollback = "#{channel_name}.scrollback"
  blocking_redis.pipelined do
    blocking_redis.publish("#{channel_name}.pubsub", event.to_json)
    blocking_redis.zadd(scrollback, event.id, event.to_json)
    blocking_redis.zremrangebyrank(scrollback, 0, -EVENT_SCORE_THRESHOLD)
    blocking_redis.expire(scrollback, EXPIRE_THRESHOLD)
  end
end

.redis_urlObject



65
66
67
# File 'lib/pubsubstub/redis_pub_sub.rb', line 65

def redis_url
  ENV['REDIS_URL'] || "redis://localhost:6379"
end

.subObject



53
54
55
# File 'lib/pubsubstub/redis_pub_sub.rb', line 53

def sub
  @sub ||= nonblocking_redis.pubsub
end

Instance Method Details

#publish(event) ⇒ Object



18
19
20
# File 'lib/pubsubstub/redis_pub_sub.rb', line 18

def publish(event)
  self.class.publish(@channel_name, event)
end

#scrollback(since_event_id) ⇒ Object



22
23
24
25
26
27
28
29
30
31
32
33
34
# File 'lib/pubsubstub/redis_pub_sub.rb', line 22

def scrollback(since_event_id)
  redis = if EventMachine.reactor_running?
    self.class.nonblocking_redis
  else
    self.class.blocking_redis
  end

  redis.zrangebyscore(key('scrollback'), "(#{since_event_id.to_i}", '+inf') do |events|
    events.each do |json|
      yield Pubsubstub::Event.from_json(json)
    end
  end
end

#subscribe(callback) ⇒ Object



10
11
12
# File 'lib/pubsubstub/redis_pub_sub.rb', line 10

def subscribe(callback)
  self.class.sub.subscribe(key('pubsub'), callback)
end

#unsubscribe(callback) ⇒ Object



14
15
16
# File 'lib/pubsubstub/redis_pub_sub.rb', line 14

def unsubscribe(callback)
  self.class.sub.unsubscribe_proc(key('pubsub'), callback)
end