Module: Dalli::Server::EventMachineConnection

Includes:
EM::Deferrable
Defined in:
lib/dalli/async_socket.rb

Overview

from thrift_cliet-0.8.2/lib/thrift_client/event_machine.rb

Constant Summary collapse

GARBAGE_BUFFER_SIZE =

4kB

4096

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Instance Attribute Details

#read_timeoutObject

Returns the value of attribute read_timeout.



41
42
43
# File 'lib/dalli/async_socket.rb', line 41

def read_timeout
  @read_timeout
end

#write_timeoutObject

Returns the value of attribute write_timeout.



41
42
43
# File 'lib/dalli/async_socket.rb', line 41

def write_timeout
  @write_timeout
end

Class Method Details

.connect(host = 'localhost', port = 11211, options = {}, &block) ⇒ Object



11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
# File 'lib/dalli/async_socket.rb', line 11

def self.connect(host='localhost', port=11211, options={}, &block)
  fiber = Fiber.current
  EM.connect(host, port, self, host, port) do |conn|
    conn.pending_connect_timeout = options[:timeout] || 5
    conn.read_timeout = options[:read_timeout] || 5
    conn.write_timeout = options[:write_timeout] || 5
  end.tap do |connection|
    connection.callback do
      fiber.resume
    end

    connection.errback do
      fiber.resume
    end

    Fiber.yield

    raise Exception, "Unable to connect to #{host}:#{port}" unless connection.connected?
  end
end

Instance Method Details

#blocking_read(size) ⇒ Object Also known as: readfull

Raises:

  • (IOError)


57
58
59
60
61
62
63
64
65
66
67
68
# File 'lib/dalli/async_socket.rb', line 57

def blocking_read(size)
  raise IOError, "lost connection to #{@host}:#{@port}: #{@disconnected}" if @disconnected
  if can_read?(size)
    yank(size)
  else
    raise ArgumentError, "Unexpected state" if @size or @callback

    timed(self.read_timeout) do
      read_with_callback size
    end
  end
end

#can_read?(size) ⇒ Boolean

Returns:

  • (Boolean)


119
120
121
# File 'lib/dalli/async_socket.rb', line 119

def can_read?(size)
  @buf.size >= @index + size
end

#closeObject



50
51
52
53
54
55
# File 'lib/dalli/async_socket.rb', line 50

def close
  trap do
    @disconnected = 'closed'
    close_connection(true)
  end
end

#connected?Boolean

Returns:

  • (Boolean)


102
103
104
# File 'lib/dalli/async_socket.rb', line 102

def connected?
  !@disconnected
end

#connection_completedObject



106
107
108
109
# File 'lib/dalli/async_socket.rb', line 106

def connection_completed
  @disconnected = nil
  succeed
end

#initialize(host, port = 9090) ⇒ Object



43
44
45
46
47
48
# File 'lib/dalli/async_socket.rb', line 43

def initialize(host, port=9090)
  @host, @port = host, port
  @index = 0
  @disconnected = 'not connected'
  @buf = ''
end

#read_with_callback(size) ⇒ Object

when enough data has been received the callback will return the data to the requesting fiber



72
73
74
75
76
77
78
79
80
81
# File 'lib/dalli/async_socket.rb', line 72

def read_with_callback(size)
  fiber = Fiber.current

  @size = size
  @callback = proc { |data|
    fiber.resume(data)
  }

  Fiber.yield
end

#receive_data(data) ⇒ Object



89
90
91
92
93
94
95
96
97
98
99
100
# File 'lib/dalli/async_socket.rb', line 89

def receive_data(data)
  trap do
    (@buf) << data

    if @callback and can_read?(@size)
      callback = @callback
      data = yank(@size)
      @callback = @size = nil
      callback.call(data)
    end
  end
end

#trapObject



32
33
34
35
36
37
38
39
# File 'lib/dalli/async_socket.rb', line 32

def trap
  begin
    yield
  rescue Exception => ex
    puts ex.message
    puts ex.backtrace.join("\n")
  end
end

#unbindObject



111
112
113
114
115
116
117
# File 'lib/dalli/async_socket.rb', line 111

def unbind
  if !@disconnected
    @disconnected = 'unbound'
  else
    fail
  end
end

#write(buf) ⇒ Object



83
84
85
86
87
# File 'lib/dalli/async_socket.rb', line 83

def write(buf)
  timed(self.write_timeout) do
    send_data buf
  end
end