Class: EventMachine::Protocols::PubSubRedis

Inherits:
Connection
  • Object
show all
Includes:
Redis
Defined in:
lib/htttee/server/pubsub_redis.rb

Class Method Summary collapse

Instance Method Summary collapse

Methods included from Redis

#multi, #pipeline

Constructor Details

#initialize(options = {}) ⇒ PubSubRedis

Returns a new instance of PubSubRedis.



61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
# File 'lib/htttee/server/pubsub_redis.rb', line 61

def initialize(options = {})
  @host           = options[:host]
  @port           = options[:port]
  @db             = (options[:db] || 0).to_i
  @password       = options[:password]
  @logger         = options[:logger]
  @error_callback = lambda do |code|
    err = RedisError.new
    err.code = code
    raise err, "Redis server returned error code: #{code}"
  end

  # These commands should be first
  auth_and_select_db
end

Class Method Details

.connect(*args) ⇒ Object



40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
# File 'lib/htttee/server/pubsub_redis.rb', line 40

def self.connect(*args)
  case args.length
  when 0
    options = {}
  when 1
    arg = args.shift
    case arg
    when Hash then options = arg
    when String then options = {:host => arg}
    else raise ArgumentError, 'first argument must be Hash or String'
    end
  when 2
    options = {:host => args[0], :port => args[1]}
  else
    raise ArgumentError, "wrong number of arguments (#{args.length} for 1)"
  end
  options[:host] ||= '127.0.0.1'
  options[:port]   = (options[:port] || 6379).to_i
  EM.connect options[:host], options[:port], self, options
end

Instance Method Details

#dispatch_response(value) ⇒ Object



20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
# File 'lib/htttee/server/pubsub_redis.rb', line 20

def dispatch_response(value)
  if @multibulk_n
    @multibulk_values << value
    @multibulk_n -= 1

    if @multibulk_n == 0
      value = @multibulk_values
      @multibulk_n,@multibulk_values = @previous_multibulks.pop
      if @multibulk_n
        dispatch_response(value)
        return
      end
    else
      return
    end
  end

  @pubsub_callback.call(value)
end

#subscribe(channel, &block) ⇒ Object



6
7
8
9
10
# File 'lib/htttee/server/pubsub_redis.rb', line 6

def subscribe(channel, &block)
  @pubsub_callback = block

  call_command(['subscribe', channel])
end

#unbindObject



77
78
79
# File 'lib/htttee/server/pubsub_redis.rb', line 77

def unbind
  @logger.debug { "Disconnected" }  if @logger
end

#unsubscribe(channel) ⇒ Object



12
13
14
15
16
17
18
# File 'lib/htttee/server/pubsub_redis.rb', line 12

def unsubscribe(channel)
  @pubsub_callback = lambda do |*args|
    close_connection
  end

  call_command(['unsubscribe', channel])
end