Class: Roby::DRoby::Logfile::Client
- Includes:
- Hooks, Hooks::InstanceHooks
- Defined in:
- lib/roby/droby/logfile/client.rb
Overview
The client part of the event log distribution service
Instance Attribute Summary collapse
-
#buffer ⇒ Object
readonly
Data that is not a full cycle worth of data (i.e. buffer needed for packet reassembly).
-
#host ⇒ Object
readonly
The host we are contacting.
-
#init_size ⇒ Object
readonly
The number of bytes that have to be transferred to finish initializing the connection.
-
#port ⇒ Object
readonly
The port on which a connection is created.
-
#rx ⇒ Object
readonly
The amount of bytes received so far.
-
#socket ⇒ Object
readonly
The socket through which we are connected to the remote host.
Hooks collapse
-
#on_data {|data| ... } ⇒ void
Hooks called with one cycle worth of data.
-
#on_init_done ⇒ void
Hooks called when we finished processing the initial set of data.
- #on_init_progress {|rx, init_size| ... } ⇒ void
Instance Method Summary collapse
- #add_listener(&block) ⇒ Object
- #alive? ⇒ Boolean
- #close ⇒ Object
- #closed? ⇒ Boolean
- #disconnect ⇒ Object
- #init_done? ⇒ Boolean
-
#initialize(host, port = Server::DEFAULT_PORT) ⇒ Client
constructor
A new instance of Client.
-
#read_and_process_one_pending_cycle ⇒ Boolean
private
Reads the socket and processes at most one cycle of data.
-
#read_and_process_pending(max: 0) ⇒ Boolean
Read and process data.
-
#read_from_socket(size = Server::DATA_CHUNK_SIZE) ⇒ Boolean
private
Read data from the underlying socket.
Methods included from Hooks
Constructor Details
#initialize(host, port = Server::DEFAULT_PORT) ⇒ Client
Returns a new instance of Client.
52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 |
# File 'lib/roby/droby/logfile/client.rb', line 52 def initialize(host, port = Server::DEFAULT_PORT) @host = host @port = port @buffer = String.new @rx = 0 @socket = begin TCPSocket.new(host, port) rescue Errno::ECONNREFUSED, Errno::EADDRNOTAVAIL => e raise Interface::ConnectionError, "cannot contact Roby log server at '#{host}:#{port}': #{e.}" end socket.fcntl(Fcntl::FD_CLOEXEC, 1) socket.fcntl(Fcntl::F_SETFL, Fcntl::O_NONBLOCK) rescue Exception socket&.close raise end |
Instance Attribute Details
#buffer ⇒ Object (readonly)
Data that is not a full cycle worth of data (i.e. buffer needed for packet reassembly)
48 49 50 |
# File 'lib/roby/droby/logfile/client.rb', line 48 def buffer @buffer end |
#host ⇒ Object (readonly)
The host we are contacting
43 44 45 |
# File 'lib/roby/droby/logfile/client.rb', line 43 def host @host end |
#init_size ⇒ Object (readonly)
The number of bytes that have to be transferred to finish initializing the connection
108 109 110 |
# File 'lib/roby/droby/logfile/client.rb', line 108 def init_size @init_size end |
#port ⇒ Object (readonly)
The port on which a connection is created
45 46 47 |
# File 'lib/roby/droby/logfile/client.rb', line 45 def port @port end |
#rx ⇒ Object (readonly)
The amount of bytes received so far
50 51 52 |
# File 'lib/roby/droby/logfile/client.rb', line 50 def rx @rx end |
#socket ⇒ Object (readonly)
The socket through which we are connected to the remote host
41 42 43 |
# File 'lib/roby/droby/logfile/client.rb', line 41 def socket @socket end |
Instance Method Details
#add_listener(&block) ⇒ Object
82 83 84 |
# File 'lib/roby/droby/logfile/client.rb', line 82 def add_listener(&block) on_data(&block) end |
#alive? ⇒ Boolean
86 87 88 |
# File 'lib/roby/droby/logfile/client.rb', line 86 def alive? @alive end |
#close ⇒ Object
74 75 76 |
# File 'lib/roby/droby/logfile/client.rb', line 74 def close @socket.close end |
#closed? ⇒ Boolean
78 79 80 |
# File 'lib/roby/droby/logfile/client.rb', line 78 def closed? @socket.closed? end |
#disconnect ⇒ Object
70 71 72 |
# File 'lib/roby/droby/logfile/client.rb', line 70 def disconnect @socket.close end |
#init_done? ⇒ Boolean
110 111 112 |
# File 'lib/roby/droby/logfile/client.rb', line 110 def init_done? @init_done end |
#on_data {|data| ... } ⇒ void
This method returns an undefined value.
Hooks called with one cycle worth of data
36 |
# File 'lib/roby/droby/logfile/client.rb', line 36 define_hooks :on_data |
#on_init_done ⇒ void
This method returns an undefined value.
Hooks called when we finished processing the initial set of data
25 |
# File 'lib/roby/droby/logfile/client.rb', line 25 define_hooks :on_init_done |
#on_init_progress {|rx, init_size| ... } ⇒ void
This method returns an undefined value.
20 |
# File 'lib/roby/droby/logfile/client.rb', line 20 define_hooks :on_init_progress |
#read_and_process_one_pending_cycle ⇒ Boolean
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Reads the socket and processes at most one cycle of data
133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 |
# File 'lib/roby/droby/logfile/client.rb', line 133 def read_and_process_one_pending_cycle Logfile.debug "#{buffer.size} bytes of data in buffer" data_size = nil loop do if buffer.size > 4 data_size = buffer.unpack1("L<") Logfile.debug "expecting data block of #{data_size} bytes" break if buffer.size >= data_size + 4 read_success = read_from_socket( [Server::DATA_CHUNK_SIZE, buffer.size - data_size].max ) return unless read_success else return unless read_from_socket end end if data_size && (buffer.size >= data_size + 4) cycle_data = buffer[4, data_size] @buffer = buffer[(data_size + 4)..-1] data = ::Marshal.load_with_missing_constants(cycle_data) if data.kind_of?(Hash) Reader.(data) elsif data == Server::CONNECTION_INIT_DONE @init_done = true run_hook :on_init_done elsif data[0] == Server::CONNECTION_INIT @init_size = data[1] else @rx += (data_size + 4) unless init_done? run_hook :on_init_progress, rx, init_size end run_hook :on_data, data end Logfile.debug "processed #{data_size} bytes of data, "\ "#{@buffer.size} remaining in buffer" true end rescue Errno::EAGAIN end |
#read_and_process_pending(max: 0) ⇒ Boolean
Read and process data
98 99 100 101 102 103 104 |
# File 'lib/roby/droby/logfile/client.rb', line 98 def read_and_process_pending(max: 0) current_time = start = Time.now while (processed_one_cycle = read_and_process_one_pending_cycle) && (current_time - start) <= max current_time = Time.now end processed_one_cycle end |
#read_from_socket(size = Server::DATA_CHUNK_SIZE) ⇒ Boolean
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Read data from the underlying socket
119 120 121 122 123 124 125 126 |
# File 'lib/roby/droby/logfile/client.rb', line 119 def read_from_socket(size = Server::DATA_CHUNK_SIZE) @buffer.concat(socket.read_nonblock(size)) true rescue EOFError, Errno::ECONNRESET, Errno::EPIPE => e raise Interface::ComError, e., e.backtrace rescue Errno::EAGAIN false end |