23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
|
# File 'lib/em-synchrony/em-hiredis.rb', line 23
def connect
@connection = EM.connect(@host, @port, Connection, @host, @port)
@connection.on(:closed) do
if @connected
@defs.each { |d| d.fail("Redis disconnected") }
@defs = []
@deferred_status = nil
@connected = false
unless @closing_connection
@reconnecting = true
reconnect
end
else
unless @closing_connection
EM.add_timer(1) { reconnect }
end
end
end
@connection.on(:connected) do
Fiber.new do
@connected = true
auth(@password) if @password
select(@db) if @db
@subs.each { |s| method_missing(:subscribe, s) }
@psubs.each { |s| method_missing(:psubscribe, s) }
succeed
if @reconnecting
@reconnecting = false
emit(:reconnected)
end
end.resume
end
@connection.on(:message) do |reply|
if RuntimeError === reply
raise "Replies out of sync: #{reply.inspect}" if @defs.empty?
deferred = @defs.shift
deferred.fail(reply) if deferred
else
if reply && PUBSUB_MESSAGES.include?(reply[0]) kind, subscription, d1, d2 = *reply
case kind.to_sym
when :message
emit(:message, subscription, d1)
when :pmessage
emit(:pmessage, subscription, d1, d2)
end
else
if @defs.empty?
if @monitoring
emit(:monitor, reply)
else
raise "Replies out of sync: #{reply.inspect}"
end
else
deferred = @defs.shift
deferred.succeed(reply) if deferred
end
end
end
end
@connected = false
@reconnecting = false
return self
end
|