Module: EM::SocketConnection

Includes:
Deferrable
Defined in:
lib/redis/event_machine.rb

Constant Summary collapse

SEP =
"\r\n"

Class Method Summary collapse

Instance Method Summary collapse

Class Method Details

.connect(host, port, timeout) ⇒ Object



70
71
72
73
74
# File 'lib/redis/event_machine.rb', line 70

def self.connect(host, port, timeout)
  EM.connect(host, port, self) do |conn|
    conn.pending_connect_timeout = timeout
  end
end

Instance Method Details

#can_read?(size) ⇒ Boolean

Returns:

  • (Boolean)


135
136
137
# File 'lib/redis/event_machine.rb', line 135

def can_read?(size)
  @buf.size >= @index + size
end

#closeObject



86
87
88
89
# File 'lib/redis/event_machine.rb', line 86

def close
  @connected = false
  close_connection(true)
end

#closed?Boolean

Returns:

  • (Boolean)


82
83
84
# File 'lib/redis/event_machine.rb', line 82

def closed?
  !@connected
end

#getsObject



115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
# File 'lib/redis/event_machine.rb', line 115

def gets
  #puts "Gets #{@buf.inspect} #{@index}"
  while true
    # Read to ensure we have some data in the buffer
    line = read(1)
    # Reset the buffer index to zero
    @buf = @buf.slice(@index..-1)
    #puts "#{@buf.inspect}"
    @index = 0
    if eol = @buf.index(SEP)
      line << yank(eol + SEP.size)
      break
    else
      # EOL not in the current buffer
      line << yank(@buf.size)
    end
  end
  line
end

#initializeObject



76
77
78
79
80
# File 'lib/redis/event_machine.rb', line 76

def initialize
  @connected = false
  @index = 0
  @buf = ''
end

#post_initObject



152
153
154
155
# File 'lib/redis/event_machine.rb', line 152

def post_init
  @connected = true
  succeed
end

#read(size) ⇒ Object



95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
# File 'lib/redis/event_machine.rb', line 95

def read(size)
  if can_read?(size)
    #puts("Redis >> can read")
    yank(size)
  else
    #puts("Redis >> cant read")
    fiber = Fiber.current
    @size = size
    @callback = proc { |data|
      fiber.resume(data)
    }
    #puts @callback
    # TODO Can leak fiber if the connection dies while
    # this fiber is yielded, waiting for data
    Fiber.yield
  end
end

#receive_data(data) ⇒ Object

EM callbacks



141
142
143
144
145
146
147
148
149
150
# File 'lib/redis/event_machine.rb', line 141

def receive_data(data)
  @buf << data
  if @callback and can_read?(@size)
    callback = @callback
    #puts "Now can read #{@size} for\n#{@callback}"
    data = yank(@size)
    @callback = @size = nil
    callback.call(data)
  end
end

#unbindObject



157
158
159
160
161
162
163
# File 'lib/redis/event_machine.rb', line 157

def unbind
  if @connected
    @connected = false
  else
    fail
  end
end

#write(buf) ⇒ Object



91
92
93
# File 'lib/redis/event_machine.rb', line 91

def write(buf)
  send_data(buf)
end