Class: Einhorn::Event::AbstractTextDescriptor

Inherits:
Object
  • Object
show all
Defined in:
lib/einhorn/event/abstract_text_descriptor.rb

Direct Known Subclasses

Connection, LoopBreaker

Constant Summary collapse

@@instance_counter =
0

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(sock) ⇒ AbstractTextDescriptor

Returns a new instance of AbstractTextDescriptor.



12
13
14
15
16
17
18
19
20
21
22
23
24
# File 'lib/einhorn/event/abstract_text_descriptor.rb', line 12

def initialize(sock)
  @@instance_counter += 1

  @socket = sock
  @client_id = "#{@@instance_counter}:#{sock.fileno}"

  @read_buffer = ""
  @write_buffer = ""

  @closed = false

  register!
end

Instance Attribute Details

#client_idObject (readonly)

Returns the value of attribute client_id.



4
5
6
# File 'lib/einhorn/event/abstract_text_descriptor.rb', line 4

def client_id
  @client_id
end

#read_bufferObject

Returns the value of attribute read_buffer.



3
4
5
# File 'lib/einhorn/event/abstract_text_descriptor.rb', line 3

def read_buffer
  @read_buffer
end

#write_bufferObject

Returns the value of attribute write_buffer.



3
4
5
# File 'lib/einhorn/event/abstract_text_descriptor.rb', line 3

def write_buffer
  @write_buffer
end

Class Method Details

.open(sock) ⇒ Object



8
9
10
# File 'lib/einhorn/event/abstract_text_descriptor.rb', line 8

def self.open(sock)
  self.new(sock)
end

Instance Method Details

#closeObject



26
27
28
29
30
# File 'lib/einhorn/event/abstract_text_descriptor.rb', line 26

def close
  @closed = true
  deregister!
  @socket.close
end

#consume_record(record) ⇒ Object

Raises:

  • (NotImplementedError)


123
124
125
# File 'lib/einhorn/event/abstract_text_descriptor.rb', line 123

def consume_record(record)
  raise NotImplementedError.new
end

#deregister!Object



99
100
101
102
# File 'lib/einhorn/event/abstract_text_descriptor.rb', line 99

def deregister!
  Einhorn::Event.deregister_readable(self)
  Einhorn::Event.deregister_writeable(self)
end

#log_debug(msg) ⇒ Object



127
128
129
# File 'lib/einhorn/event/abstract_text_descriptor.rb', line 127

def log_debug(msg)
  Einhorn.log_debug("[client #{client_id}] #{msg}")
end

#log_error(msg) ⇒ Object



135
136
137
# File 'lib/einhorn/event/abstract_text_descriptor.rb', line 135

def log_error(msg)
  Einhorn.log_error("[client #{client_id}] #{msg}")
end

#log_info(msg) ⇒ Object



131
132
133
# File 'lib/einhorn/event/abstract_text_descriptor.rb', line 131

def log_info(msg)
  Einhorn.log_info("[client #{client_id}] #{msg}")
end

#notify_readableObject



42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
# File 'lib/einhorn/event/abstract_text_descriptor.rb', line 42

def notify_readable
  while true
    begin
      return if @closed
      chunk = @socket.read_nonblock(1024)
    rescue Errno::EAGAIN
      break
    rescue EOFError, Errno::EPIPE, Errno::ECONNRESET
      close
      break
    rescue StandardError => e
      log_error("Caught unrecognized error while reading from socket: #{e} (#{e.class})")
      close
      break
    else
      log_debug("read #{chunk.length} bytes (#{chunk.inspect[0..20]})")
      @read_buffer << chunk
      process_read_buffer
    end
  end
end

#notify_writeableObject



74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
# File 'lib/einhorn/event/abstract_text_descriptor.rb', line 74

def notify_writeable
  begin
    return if @closed
    written = @socket.write_nonblock(@write_buffer)
  rescue Errno::EWOULDBLOCK, Errno::EAGAIN, Errno::EINTR
  rescue Errno::EPIPE, Errno::ECONNRESET
    close
  rescue StandardError => e
    log_error("Caught unrecognized error while writing to socket: #{e} (#{e.class})")
    close
  else
    log_debug("wrote #{written} bytes")
    @write_buffer = @write_buffer[written..-1]
  end
end

#parse_recordObject

Override in subclass. This lets you do streaming reads.



119
120
121
# File 'lib/einhorn/event/abstract_text_descriptor.rb', line 119

def parse_record
  [@read_buffer, '']
end

#process_read_bufferObject



104
105
106
107
108
109
110
111
112
113
114
115
116
# File 'lib/einhorn/event/abstract_text_descriptor.rb', line 104

def process_read_buffer
  while true
    if @read_buffer.length > 0
      break unless split = parse_record
      record, remainder = split
      log_debug("Read a record of #{record.length} bytes.")
      @read_buffer = remainder
      consume_record(record)
    else
      break
    end
  end
end

#read(&blk) ⇒ Object

API method



33
34
35
36
37
38
39
40
# File 'lib/einhorn/event/abstract_text_descriptor.rb', line 33

def read(&blk)
  raise "Already registered a read block" if @read_blk
  raise "No block provided" unless blk
  raise "Must provide a block that accepts two arguments" unless blk.arity == 2

  @read_blk = blk
  notify_readable # Read what you can
end

#register!Object



94
95
96
97
# File 'lib/einhorn/event/abstract_text_descriptor.rb', line 94

def register!
  Einhorn::Event.register_readable(self)
  Einhorn::Event.register_writeable(self)
end

#to_ioObject



90
91
92
# File 'lib/einhorn/event/abstract_text_descriptor.rb', line 90

def to_io
  @socket
end

#write(data) ⇒ Object

API method



65
66
67
68
# File 'lib/einhorn/event/abstract_text_descriptor.rb', line 65

def write(data)
  @write_buffer << data
  notify_writeable # Write what you can
end

#write_pending?Boolean

Returns:

  • (Boolean)


70
71
72
# File 'lib/einhorn/event/abstract_text_descriptor.rb', line 70

def write_pending?
  @write_buffer.length > 0
end