Class: Firehose::Server::Channel
- Inherits:
-
Object
- Object
- Firehose::Server::Channel
- Defined in:
- lib/firehose/server/channel.rb
Overview
Connects to a specific channel on Redis and listens for messages to notify subscribers.
Instance Attribute Summary collapse
-
#channel_key ⇒ Object
readonly
Returns the value of attribute channel_key.
-
#list_key ⇒ Object
readonly
Returns the value of attribute list_key.
-
#redis ⇒ Object
readonly
Returns the value of attribute redis.
-
#sequence_key ⇒ Object
readonly
Returns the value of attribute sequence_key.
-
#subscriber ⇒ Object
readonly
Returns the value of attribute subscriber.
Class Method Summary collapse
Instance Method Summary collapse
-
#initialize(channel_key, redis = self.class.redis, subscriber = self.class.subscriber) ⇒ Channel
constructor
A new instance of Channel.
- #next_message(last_sequence = nil, options = {}) ⇒ Object
- #unsubscribe(deferrable) ⇒ Object
Constructor Details
#initialize(channel_key, redis = self.class.redis, subscriber = self.class.subscriber) ⇒ Channel
Returns a new instance of Channel.
15 16 17 18 |
# File 'lib/firehose/server/channel.rb', line 15 def initialize(channel_key, redis=self.class.redis, subscriber=self.class.subscriber) @channel_key, @redis, @subscriber = channel_key, redis, subscriber @list_key, @sequence_key = Server.key(channel_key, :list), Server.key(channel_key, :sequence) end |
Instance Attribute Details
#channel_key ⇒ Object (readonly)
Returns the value of attribute channel_key.
5 6 7 |
# File 'lib/firehose/server/channel.rb', line 5 def channel_key @channel_key end |
#list_key ⇒ Object (readonly)
Returns the value of attribute list_key.
5 6 7 |
# File 'lib/firehose/server/channel.rb', line 5 def list_key @list_key end |
#redis ⇒ Object (readonly)
Returns the value of attribute redis.
5 6 7 |
# File 'lib/firehose/server/channel.rb', line 5 def redis @redis end |
#sequence_key ⇒ Object (readonly)
Returns the value of attribute sequence_key.
5 6 7 |
# File 'lib/firehose/server/channel.rb', line 5 def sequence_key @sequence_key end |
#subscriber ⇒ Object (readonly)
Returns the value of attribute subscriber.
5 6 7 |
# File 'lib/firehose/server/channel.rb', line 5 def subscriber @subscriber end |
Class Method Details
.redis ⇒ Object
7 8 9 |
# File 'lib/firehose/server/channel.rb', line 7 def self.redis @redis ||= EM::Hiredis.connect end |
.subscriber ⇒ Object
11 12 13 |
# File 'lib/firehose/server/channel.rb', line 11 def self.subscriber @subscriber ||= Server::Subscriber.new(EM::Hiredis.connect) end |
Instance Method Details
#next_message(last_sequence = nil, options = {}) ⇒ Object
20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 |
# File 'lib/firehose/server/channel.rb', line 20 def (last_sequence=nil, ={}) last_sequence = last_sequence.to_i deferrable = EM::DefaultDeferrable.new # TODO - Think this through a little harder... maybe some tests ol buddy! deferrable.errback {|e| EM.next_tick { raise e } unless [:timeout, :disconnect].include?(e) } # TODO: Use HSET so we don't have to pull 100 messages back every time. redis.multi redis.get(sequence_key). errback {|e| deferrable.fail e } redis.lrange(list_key, 0, Server::Publisher::MAX_MESSAGES). errback {|e| deferrable.fail e } redis.exec.callback do |(sequence, )| Firehose.logger.debug "exec returned: `#{sequence}` and `#{.inspect}`" sequence = sequence.to_i if sequence.nil? || (diff = sequence - last_sequence) <= 0 Firehose.logger.debug "No message available yet, subscribing. sequence: `#{sequence}`" # Either this resource has never been seen before or we are all caught up. # Subscribe and hope something gets published to this end-point. subscribe(deferrable, [:timeout]) elsif last_sequence > 0 && diff < Server::Publisher::MAX_MESSAGES # The client is kinda-sorta running behind, but has a chance to catch # up. Catch them up FTW. # But we won't "catch them up" if last_sequence was zero/nil because # that implies the client is connecting for the 1st time. = [diff-1] Firehose.logger.debug "Sending old message `#{}` and sequence `#{sequence}` to client directly. Client is `#{diff}` behind, at `#{last_sequence}`." deferrable.succeed , last_sequence + 1 else # The client is hopelessly behind and underwater. Just reset # their whole world with the lastest message. = [0] Firehose.logger.debug "Sending latest message `#{}` and sequence `#{sequence}` to client directly." deferrable.succeed , sequence end end.errback {|e| deferrable.fail e } deferrable end |
#unsubscribe(deferrable) ⇒ Object
62 63 64 |
# File 'lib/firehose/server/channel.rb', line 62 def unsubscribe(deferrable) subscriber.unsubscribe channel_key, deferrable end |