Module: EM::SocketConnection
- Includes:
- Deferrable
- Defined in:
- lib/redis/event_machine.rb
Constant Summary collapse
- SEP =
"\r\n"
Class Method Summary collapse
Instance Method Summary collapse
- #can_read?(size) ⇒ Boolean
- #close ⇒ Object
- #closed? ⇒ Boolean
- #gets ⇒ Object
- #initialize ⇒ Object
- #post_init ⇒ Object
- #read(size) ⇒ Object
-
#receive_data(data) ⇒ Object
EM callbacks.
- #unbind ⇒ Object
- #write(buf) ⇒ Object
Class Method Details
.connect(host, port, timeout) ⇒ Object
70 71 72 73 74 |
# File 'lib/redis/event_machine.rb', line 70 def self.connect(host, port, timeout) EM.connect(host, port, self) do |conn| conn.pending_connect_timeout = timeout end end |
Instance Method Details
#can_read?(size) ⇒ Boolean
135 136 137 |
# File 'lib/redis/event_machine.rb', line 135 def can_read?(size) @buf.size >= @index + size end |
#close ⇒ Object
86 87 88 89 |
# File 'lib/redis/event_machine.rb', line 86 def close @connected = false close_connection(true) end |
#closed? ⇒ Boolean
82 83 84 |
# File 'lib/redis/event_machine.rb', line 82 def closed? !@connected end |
#gets ⇒ Object
115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 |
# File 'lib/redis/event_machine.rb', line 115 def gets #puts "Gets #{@buf.inspect} #{@index}" while true # Read to ensure we have some data in the buffer line = read(1) # Reset the buffer index to zero @buf = @buf.slice(@index..-1) #puts "#{@buf.inspect}" @index = 0 if eol = @buf.index(SEP) line << yank(eol + SEP.size) break else # EOL not in the current buffer line << yank(@buf.size) end end line end |
#initialize ⇒ Object
76 77 78 79 80 |
# File 'lib/redis/event_machine.rb', line 76 def initialize @connected = false @index = 0 @buf = '' end |
#post_init ⇒ Object
152 153 154 155 |
# File 'lib/redis/event_machine.rb', line 152 def post_init @connected = true succeed end |
#read(size) ⇒ Object
95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 |
# File 'lib/redis/event_machine.rb', line 95 def read(size) if can_read?(size) #puts("Redis >> can read") yank(size) else #puts("Redis >> cant read") fiber = Fiber.current @size = size @callback = proc { |data| fiber.resume(data) } #puts @callback # TODO Can leak fiber if the connection dies while # this fiber is yielded, waiting for data Fiber.yield end end |
#receive_data(data) ⇒ Object
EM callbacks
141 142 143 144 145 146 147 148 149 150 |
# File 'lib/redis/event_machine.rb', line 141 def receive_data(data) @buf << data if @callback and can_read?(@size) callback = @callback #puts "Now can read #{@size} for\n#{@callback}" data = yank(@size) @callback = @size = nil callback.call(data) end end |
#unbind ⇒ Object
157 158 159 160 161 162 163 |
# File 'lib/redis/event_machine.rb', line 157 def unbind if @connected @connected = false else fail end end |
#write(buf) ⇒ Object
91 92 93 |
# File 'lib/redis/event_machine.rb', line 91 def write(buf) send_data(buf) end |