Class: Roby::DRoby::Logfile::Client
- Includes:
- Hooks::InstanceHooks, Hooks
- Defined in:
- lib/roby/droby/logfile/client.rb
Overview
The client part of the event log distribution service
Instance Attribute Summary collapse
-
#buffer ⇒ Object
readonly
Data that is not a full cycle worth of data (i.e. buffer needed for packet reassembly).
-
#host ⇒ Object
readonly
The host we are contacting.
-
#init_size ⇒ Object
readonly
The number of bytes that have to be transferred to finish initializing the connection.
-
#port ⇒ Object
readonly
The port on which a connection is created.
-
#rx ⇒ Object
readonly
The amount of bytes received so far.
-
#socket ⇒ Object
readonly
The socket through which we are connected to the remote host.
Hooks collapse
-
#on_data {|data| ... } ⇒ void
Hooks called with one cycle worth of data.
-
#on_init_done ⇒ void
Hooks called when we finished processing the initial set of data.
- #on_init_progress {|rx, init_size| ... } ⇒ void
Instance Method Summary collapse
- #add_listener(&block) ⇒ Object
- #alive? ⇒ Boolean
- #close ⇒ Object
- #closed? ⇒ Boolean
- #disconnect ⇒ Object
- #init_done? ⇒ Boolean
-
#initialize(host, port = Server::DEFAULT_PORT) ⇒ Client
constructor
A new instance of Client.
-
#read_and_process_one_pending_cycle ⇒ Boolean
private
Reads the socket and processes at most one cycle of data.
-
#read_and_process_pending(max: 0) ⇒ Boolean
Read and process data.
-
#read_from_socket(size = Server::DATA_CHUNK_SIZE) ⇒ Boolean
private
Read data from the underlying socket.
Methods included from Hooks
Constructor Details
#initialize(host, port = Server::DEFAULT_PORT) ⇒ Client
Returns a new instance of Client.
50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 |
# File 'lib/roby/droby/logfile/client.rb', line 50 def initialize(host, port = Server::DEFAULT_PORT) @host = host @port = port @buffer = "" @rx = 0 @socket = begin TCPSocket.new(host, port) rescue Errno::ECONNREFUSED => e raise Interface::ConnectionError, "cannot contact Roby log server at '#{host}:#{port}': #{e.message}" end socket.fcntl(Fcntl::FD_CLOEXEC, 1) socket.fcntl(Fcntl::F_SETFL, Fcntl::O_NONBLOCK) rescue Exception socket.close if socket raise end |
Instance Attribute Details
#buffer ⇒ Object (readonly)
Data that is not a full cycle worth of data (i.e. buffer needed for packet reassembly)
46 47 48 |
# File 'lib/roby/droby/logfile/client.rb', line 46 def buffer @buffer end |
#host ⇒ Object (readonly)
The host we are contacting
41 42 43 |
# File 'lib/roby/droby/logfile/client.rb', line 41 def host @host end |
#init_size ⇒ Object (readonly)
The number of bytes that have to be transferred to finish initializing the connection
106 107 108 |
# File 'lib/roby/droby/logfile/client.rb', line 106 def init_size @init_size end |
#port ⇒ Object (readonly)
The port on which a connection is created
43 44 45 |
# File 'lib/roby/droby/logfile/client.rb', line 43 def port @port end |
#rx ⇒ Object (readonly)
The amount of bytes received so far
48 49 50 |
# File 'lib/roby/droby/logfile/client.rb', line 48 def rx @rx end |
#socket ⇒ Object (readonly)
The socket through which we are connected to the remote host
39 40 41 |
# File 'lib/roby/droby/logfile/client.rb', line 39 def socket @socket end |
Instance Method Details
#add_listener(&block) ⇒ Object
80 81 82 |
# File 'lib/roby/droby/logfile/client.rb', line 80 def add_listener(&block) on_data(&block) end |
#alive? ⇒ Boolean
84 85 86 |
# File 'lib/roby/droby/logfile/client.rb', line 84 def alive? @alive end |
#close ⇒ Object
72 73 74 |
# File 'lib/roby/droby/logfile/client.rb', line 72 def close @socket.close end |
#closed? ⇒ Boolean
76 77 78 |
# File 'lib/roby/droby/logfile/client.rb', line 76 def closed? @socket.closed? end |
#disconnect ⇒ Object
68 69 70 |
# File 'lib/roby/droby/logfile/client.rb', line 68 def disconnect @socket.close end |
#init_done? ⇒ Boolean
108 109 110 |
# File 'lib/roby/droby/logfile/client.rb', line 108 def init_done? @init_done end |
#on_data {|data| ... } ⇒ void
This method returns an undefined value.
Hooks called with one cycle worth of data
34 |
# File 'lib/roby/droby/logfile/client.rb', line 34 define_hooks :on_data |
#on_init_done ⇒ void
This method returns an undefined value.
Hooks called when we finished processing the initial set of data
23 |
# File 'lib/roby/droby/logfile/client.rb', line 23 define_hooks :on_init_done |
#on_init_progress {|rx, init_size| ... } ⇒ void
This method returns an undefined value.
18 |
# File 'lib/roby/droby/logfile/client.rb', line 18 define_hooks :on_init_progress |
#read_and_process_one_pending_cycle ⇒ Boolean
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Reads the socket and processes at most one cycle of data
131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 |
# File 'lib/roby/droby/logfile/client.rb', line 131 def read_and_process_one_pending_cycle Logfile.debug "#{buffer.size} bytes of data in buffer" while true if buffer.size > 4 data_size = buffer.unpack('L<').first Logfile.debug "expecting data block of #{data_size} bytes" if buffer.size >= data_size + 4 break end return if !read_from_socket([Server::DATA_CHUNK_SIZE, buffer.size - data_size].max) else return if !read_from_socket end end if data_size && (buffer.size >= data_size + 4) cycle_data = buffer[4, data_size] @buffer = buffer[(data_size + 4)..-1] data = ::Marshal.load_with_missing_constants(cycle_data) if data.kind_of?(Hash) Reader.(data) elsif data == Server::CONNECTION_INIT_DONE @init_done = true run_hook :on_init_done elsif data[0] == Server::CONNECTION_INIT @init_size = data[1] else @rx += (data_size + 4) if !init_done? run_hook :on_init_progress, rx, init_size end run_hook :on_data, data end Logfile.debug "processed #{data_size} bytes of data, #{@buffer.size} remaining in buffer" true end rescue Errno::EAGAIN end |
#read_and_process_pending(max: 0) ⇒ Boolean
Read and process data
96 97 98 99 100 101 102 |
# File 'lib/roby/droby/logfile/client.rb', line 96 def read_and_process_pending(max: 0) current_time = start = Time.now while (processed_one_cycle = read_and_process_one_pending_cycle) && (current_time - start) <= max current_time = Time.now end processed_one_cycle end |
#read_from_socket(size = Server::DATA_CHUNK_SIZE) ⇒ Boolean
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Read data from the underlying socket
117 118 119 120 121 122 123 124 |
# File 'lib/roby/droby/logfile/client.rb', line 117 def read_from_socket(size = Server::DATA_CHUNK_SIZE) @buffer.concat(socket.read_nonblock(size)) true rescue EOFError, Errno::ECONNRESET, Errno::EPIPE => e raise Interface::ComError, e., e.backtrace rescue Errno::EAGAIN false end |