Class: Roby::DRoby::Logfile::Server
- Extended by:
- Logger::Hierarchy
- Defined in:
- lib/roby/droby/logfile/server.rb
Overview
This is the server part of the log distribution mechanism
It is basically a file distribution mechanism: it “listens” to the event log file and sends new data to the clients that are connected to it.
When a client connects, it will send the complete file
Constant Summary collapse
- DEFAULT_PORT =
20_200- DEFAULT_SAMPLING_PERIOD =
0.05- DATA_CHUNK_SIZE =
512 * 1024
- CONNECTION_INIT =
:log_server_connection_init- CONNECTION_INIT_DONE =
:log_server_connection_init_done
Instance Attribute Summary collapse
-
#event_file ⇒ Object
readonly
The IO object that we use to read the event file.
-
#event_file_path ⇒ Object
readonly
The path to the event file this server is listening to.
-
#pending_data ⇒ Object
readonly
A mapping from socket to data chunks representing the data that should be sent to a particular client.
-
#sampling_period ⇒ Object
readonly
The sampling period (in seconds).
-
#server ⇒ Object
readonly
The server socket.
-
#server_io ⇒ Object
readonly
The IO we are listening on.
Instance Method Summary collapse
-
#close ⇒ Object
Close all IOs managed by this instance.
-
#close_client_connections ⇒ Object
Close all currently opened client connections.
- #exec ⇒ Object
- #found_header? ⇒ Boolean
-
#initialize(event_file_path, sampling_period, io) ⇒ Server
constructor
A new instance of Server.
-
#read_new_data ⇒ Object
Reads new data from the underlying file and queues it to dispatch for our clients.
-
#send_pending_data ⇒ Object
Tries to send all pending data to the connected clients.
-
#split_in_chunks(data) ⇒ Object
Splits the data block in
datain blocks of size DATA_CHUNK_SIZE.
Constructor Details
#initialize(event_file_path, sampling_period, io) ⇒ Server
Returns a new instance of Server.
41 42 43 44 45 46 47 |
# File 'lib/roby/droby/logfile/server.rb', line 41 def initialize(event_file_path, sampling_period, io) @server = io @pending_data = {} @sampling_period = sampling_period @event_file_path = event_file_path @event_file = File.open(event_file_path, "r:BINARY") end |
Instance Attribute Details
#event_file ⇒ Object (readonly)
The IO object that we use to read the event file
34 35 36 |
# File 'lib/roby/droby/logfile/server.rb', line 34 def event_file @event_file end |
#event_file_path ⇒ Object (readonly)
The path to the event file this server is listening to
32 33 34 |
# File 'lib/roby/droby/logfile/server.rb', line 32 def event_file_path @event_file_path end |
#pending_data ⇒ Object (readonly)
A mapping from socket to data chunks representing the data that should be sent to a particular client
37 38 39 |
# File 'lib/roby/droby/logfile/server.rb', line 37 def pending_data @pending_data end |
#sampling_period ⇒ Object (readonly)
The sampling period (in seconds)
30 31 32 |
# File 'lib/roby/droby/logfile/server.rb', line 30 def sampling_period @sampling_period end |
#server ⇒ Object (readonly)
The server socket
39 40 41 |
# File 'lib/roby/droby/logfile/server.rb', line 39 def server @server end |
#server_io ⇒ Object (readonly)
The IO we are listening on
28 29 30 |
# File 'lib/roby/droby/logfile/server.rb', line 28 def server_io @server_io end |
Instance Method Details
#close ⇒ Object
Close all IOs managed by this instance
This does NOT close the server IO, which is owned by the caller
57 58 59 60 |
# File 'lib/roby/droby/logfile/server.rb', line 57 def close close_client_connections @event_file.close end |
#close_client_connections ⇒ Object
Close all currently opened client connections
This does NOT close the server IO, which is owned by the caller
66 67 68 69 |
# File 'lib/roby/droby/logfile/server.rb', line 66 def close_client_connections @pending_data.each_key(&:close) @pending_data.clear end |
#exec ⇒ Object
71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 |
# File 'lib/roby/droby/logfile/server.rb', line 71 def exec loop do sockets_with_pending_data = pending_data.find_all do |socket, chunks| !chunks.empty? end.map(&:first) unless sockets_with_pending_data.empty? Server.debug "#{sockets_with_pending_data.size} sockets have pending data" end readable_sockets, = select([server], sockets_with_pending_data, nil, sampling_period) # Incoming connections if readable_sockets && !readable_sockets.empty? socket = Thread.handle_interrupt(Interrupt => :never) do s = server.accept @pending_data[s] = [] s end socket.setsockopt(Socket::IPPROTO_TCP, Socket::TCP_NODELAY, true) socket.fcntl(Fcntl::FD_CLOEXEC, 1) Server.debug "new connection: #{socket}" if found_header? all_data = File.binread(event_file_path, event_file.tell - Logfile::PROLOGUE_SIZE, Logfile::PROLOGUE_SIZE) Server.debug " queueing #{all_data.size} bytes of data" chunks = split_in_chunks(all_data) else Server.debug " log file is empty, not queueing any data" chunks = [] end connection_init = ::Marshal.dump([CONNECTION_INIT, chunks.inject(0) { |s, c| s + c.size }]) connection_init_done = ::Marshal.dump(CONNECTION_INIT_DONE) chunks.unshift([connection_init.size].pack("L<") + connection_init) chunks << [connection_init_done.size].pack("L<") + connection_init_done @pending_data[socket] = chunks end # Read new data read_new_data # Send data to our peers send_pending_data end rescue Exception pending_data.each_key(&:close) raise end |
#found_header? ⇒ Boolean
49 50 51 |
# File 'lib/roby/droby/logfile/server.rb', line 49 def found_header? @found_header end |
#read_new_data ⇒ Object
Reads new data from the underlying file and queues it to dispatch for our clients
144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 |
# File 'lib/roby/droby/logfile/server.rb', line 144 def read_new_data new_data = event_file.read return if new_data.empty? unless found_header? if new_data.size >= Logfile::PROLOGUE_SIZE # This will read and validate the prologue Logfile.read_prologue(StringIO.new(new_data)) new_data = new_data[Logfile::PROLOGUE_SIZE..-1] @found_header = true else # Go back to the beginning of the file so that, next # time, we read the complete prologue again event_file.rewind return end end # Split the data in chunks of DATA_CHUNK_SIZE, and add the # chunks in the pending_data hash new_chunks = split_in_chunks(new_data) pending_data.each_value do |chunks| chunks.concat(new_chunks) end end |
#send_pending_data ⇒ Object
Tries to send all pending data to the connected clients
174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 |
# File 'lib/roby/droby/logfile/server.rb', line 174 def send_pending_data needs_looping = true while needs_looping needs_looping = false pending_data.delete_if do |socket, chunks| if chunks.empty? # nothing left to send for this socket next end buffer = chunks.shift while !chunks.empty? && (buffer.size + chunks[0].size < DATA_CHUNK_SIZE) buffer.concat(chunks.shift) end Server.debug "sending #{buffer.size} bytes to #{socket}" begin written = socket.write_nonblock(buffer) rescue Interrupt raise rescue Errno::EAGAIN Server.debug "cannot send: send buffer full" chunks.unshift(buffer) next rescue Exception => e Server.warn "disconnecting from #{socket}: #{e.}" e.backtrace.each do |line| Server.warn " #{line}" end socket.close next(true) end remaining = buffer.size - written if remaining == 0 Server.debug "wrote complete chunk of #{written} bytes to #{socket}" # Loop if we wrote the complete chunk and there # is still stuff to write for this socket needs_looping = !chunks.empty? else Server.debug "wrote partial chunk #{written} bytes instead of #{buffer.size} bytes to #{socket}" chunks.unshift(buffer[written, remaining]) end false end end end |
#split_in_chunks(data) ⇒ Object
Splits the data block in data in blocks of size DATA_CHUNK_SIZE
125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 |
# File 'lib/roby/droby/logfile/server.rb', line 125 def split_in_chunks(data) result = [] index = 0 while index != data.size remaining = (data.size - index) if remaining > DATA_CHUNK_SIZE result << data[index, DATA_CHUNK_SIZE] index += DATA_CHUNK_SIZE else result << data[index, remaining] index = data.size end end result end |