Class: Roby::DRoby::Logfile::Server

Inherits:
Object
  • Object
show all
Extended by:
Logger::Hierarchy
Defined in:
lib/roby/droby/logfile/server.rb

Overview

This is the server part of the log distribution mechanism

It is basically a file distribution mechanism: it “listens” to the event log file and sends new data to the clients that are connected to it.

When a client connects, it will send the complete file

Constant Summary collapse

DEFAULT_PORT =
20_200
DEFAULT_SAMPLING_PERIOD =
0.05
DATA_CHUNK_SIZE =
512 * 1024
CONNECTION_INIT =
:log_server_connection_init
CONNECTION_INIT_DONE =
:log_server_connection_init_done

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(event_file_path, sampling_period, io) ⇒ Server

Returns a new instance of Server.



41
42
43
44
45
46
47
# File 'lib/roby/droby/logfile/server.rb', line 41

def initialize(event_file_path, sampling_period, io)
    @server = io
    @pending_data = {}
    @sampling_period = sampling_period
    @event_file_path = event_file_path
    @event_file = File.open(event_file_path, "r:BINARY")
end

Instance Attribute Details

#event_fileObject (readonly)

The IO object that we use to read the event file



34
35
36
# File 'lib/roby/droby/logfile/server.rb', line 34

def event_file
  @event_file
end

#event_file_pathObject (readonly)

The path to the event file this server is listening to



32
33
34
# File 'lib/roby/droby/logfile/server.rb', line 32

def event_file_path
  @event_file_path
end

#pending_dataObject (readonly)

A mapping from socket to data chunks representing the data that should be sent to a particular client



37
38
39
# File 'lib/roby/droby/logfile/server.rb', line 37

def pending_data
  @pending_data
end

#sampling_periodObject (readonly)

The sampling period (in seconds)



30
31
32
# File 'lib/roby/droby/logfile/server.rb', line 30

def sampling_period
  @sampling_period
end

#serverObject (readonly)

The server socket



39
40
41
# File 'lib/roby/droby/logfile/server.rb', line 39

def server
  @server
end

#server_ioObject (readonly)

The IO we are listening on



28
29
30
# File 'lib/roby/droby/logfile/server.rb', line 28

def server_io
  @server_io
end

Instance Method Details

#closeObject

Close all IOs managed by this instance

This does NOT close the server IO, which is owned by the caller



57
58
59
60
# File 'lib/roby/droby/logfile/server.rb', line 57

def close
    close_client_connections
    @event_file.close
end

#close_client_connectionsObject

Close all currently opened client connections

This does NOT close the server IO, which is owned by the caller



66
67
68
69
# File 'lib/roby/droby/logfile/server.rb', line 66

def close_client_connections
    @pending_data.each_key(&:close)
    @pending_data.clear
end

#execObject



71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
# File 'lib/roby/droby/logfile/server.rb', line 71

def exec
    loop do
        sockets_with_pending_data = pending_data.find_all do |socket, chunks|
            !chunks.empty?
        end.map(&:first)

        unless sockets_with_pending_data.empty?
            Server.debug "#{sockets_with_pending_data.size} sockets have pending data"
        end

        readable_sockets, =
            select([server], sockets_with_pending_data, nil, sampling_period)

        # Incoming connections
        if readable_sockets && !readable_sockets.empty?
            socket = Thread.handle_interrupt(Interrupt => :never) do
                s = server.accept
                @pending_data[s] = []
                s
            end

            socket.setsockopt(Socket::IPPROTO_TCP, Socket::TCP_NODELAY, true)
            socket.fcntl(Fcntl::FD_CLOEXEC, 1)

            Server.debug "new connection: #{socket}"
            if found_header?
                all_data = File.binread(event_file_path,
                                        event_file.tell - Logfile::PROLOGUE_SIZE,
                                        Logfile::PROLOGUE_SIZE)

                Server.debug "  queueing #{all_data.size} bytes of data"
                chunks = split_in_chunks(all_data)
            else
                Server.debug "  log file is empty, not queueing any data"
                chunks = []
            end
            connection_init      = ::Marshal.dump([CONNECTION_INIT, chunks.inject(0) { |s, c| s + c.size }])
            connection_init_done = ::Marshal.dump(CONNECTION_INIT_DONE)
            chunks.unshift([connection_init.size].pack("L<") + connection_init)
            chunks << [connection_init_done.size].pack("L<") + connection_init_done
            @pending_data[socket] = chunks
        end

        # Read new data
        read_new_data
        # Send data to our peers
        send_pending_data
    end
rescue Exception
    pending_data.each_key(&:close)
    raise
end

#found_header?Boolean

Returns:

  • (Boolean)


49
50
51
# File 'lib/roby/droby/logfile/server.rb', line 49

def found_header?
    @found_header
end

#read_new_dataObject

Reads new data from the underlying file and queues it to dispatch for our clients



144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
# File 'lib/roby/droby/logfile/server.rb', line 144

def read_new_data
    new_data = event_file.read
    return if new_data.empty?

    unless found_header?
        if new_data.size >= Logfile::PROLOGUE_SIZE
            # This will read and validate the prologue
            Logfile.read_prologue(StringIO.new(new_data))
            new_data = new_data[Logfile::PROLOGUE_SIZE..-1]
            @found_header = true
        else
            # Go back to the beginning of the file so that, next
            # time, we read the complete prologue again
            event_file.rewind
            return
        end
    end

    # Split the data in chunks of DATA_CHUNK_SIZE, and add the
    # chunks in the pending_data hash
    new_chunks = split_in_chunks(new_data)
    pending_data.each_value do |chunks|
        chunks.concat(new_chunks)
    end
end

#send_pending_dataObject

Tries to send all pending data to the connected clients



174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
# File 'lib/roby/droby/logfile/server.rb', line 174

def send_pending_data
    needs_looping = true
    while needs_looping
        needs_looping = false
        pending_data.delete_if do |socket, chunks|
            if chunks.empty?
                # nothing left to send for this socket
                next
            end

            buffer = chunks.shift
            while !chunks.empty? && (buffer.size + chunks[0].size < DATA_CHUNK_SIZE)
                buffer.concat(chunks.shift)
            end
            Server.debug "sending #{buffer.size} bytes to #{socket}"

            begin
                written = socket.write_nonblock(buffer)
            rescue Interrupt
                raise
            rescue Errno::EAGAIN
                Server.debug "cannot send: send buffer full"
                chunks.unshift(buffer)
                next
            rescue Exception => e
                Server.warn "disconnecting from #{socket}: #{e.message}"
                e.backtrace.each do |line|
                    Server.warn "  #{line}"
                end
                socket.close
                next(true)
            end

            remaining = buffer.size - written
            if remaining == 0
                Server.debug "wrote complete chunk of #{written} bytes to #{socket}"
                # Loop if we wrote the complete chunk and there
                # is still stuff to write for this socket
                needs_looping = !chunks.empty?
            else
                Server.debug "wrote partial chunk #{written} bytes instead of #{buffer.size} bytes to #{socket}"
                chunks.unshift(buffer[written, remaining])
            end
            false
        end
    end
end

#split_in_chunks(data) ⇒ Object

Splits the data block in data in blocks of size DATA_CHUNK_SIZE



125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
# File 'lib/roby/droby/logfile/server.rb', line 125

def split_in_chunks(data)
    result = []

    index = 0
    while index != data.size
        remaining = (data.size - index)
        if remaining > DATA_CHUNK_SIZE
            result << data[index, DATA_CHUNK_SIZE]
            index += DATA_CHUNK_SIZE
        else
            result << data[index, remaining]
            index = data.size
        end
    end
    result
end