Class: Zashoku::Net::Client
- Inherits:
-
Object
- Object
- Zashoku::Net::Client
- Defined in:
- lib/core/net/client.rb
Constant Summary collapse
- BSIZE =
1000000
Instance Attribute Summary collapse
-
#callbacks ⇒ Object
Returns the value of attribute callbacks.
Class Method Summary collapse
Instance Method Summary collapse
- #alive? ⇒ Boolean
- #command(msg, args = {}) ⇒ Object
- #command_to_file(file, msg, args = {}) ⇒ Object
- #connect(host, port, loud_fail: true) ⇒ Object
- #disconnect ⇒ Object
- #dispatch_events! ⇒ Object
- #distribute_results! ⇒ Object
- #get_items(_mod) ⇒ Object
-
#initialize(host, port) ⇒ Client
constructor
A new instance of Client.
- #lost_connection ⇒ Object
- #pump_commands! ⇒ Object
- #pump_results! ⇒ Object
- #say_helo ⇒ Object
- #start_threads ⇒ Object
- #stop_threads ⇒ Object
Constructor Details
#initialize(host, port) ⇒ Client
Returns a new instance of Client.
14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 |
# File 'lib/core/net/client.rb', line 14 def initialize(host, port) @host, @port = host, port @socket = connect(@host, @port) @alive = true @blocked = false @timeout = 2 @callbacks = [] @command_queue = Queue.new @result_queue = Queue.new @event_queue = Queue.new @semaphore = Mutex.new start_threads say_helo end |
Instance Attribute Details
#callbacks ⇒ Object
Returns the value of attribute callbacks.
10 11 12 |
# File 'lib/core/net/client.rb', line 10 def callbacks @callbacks end |
Class Method Details
.command(host, port, msg, args = {}) ⇒ Object
55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 |
# File 'lib/core/net/client.rb', line 55 def self.command(host, port, msg, args = {}) sock = TCPSocket.new(host, port) sock.puts(JSON.generate({'msg' => msg}.merge(args))) result = sock.readline sock.close { payload: JSON.parse(result).map { |k, v| "#{k}: #{v}\n" }.join, status: true } rescue Errno::ECONNREFUSED { payload: "error: could not connect connect to #{host}:#{port}\n", status: false } rescue EOFError { payload: nil, status: false } end |
Instance Method Details
#alive? ⇒ Boolean
87 88 89 |
# File 'lib/core/net/client.rb', line 87 def alive? @alive end |
#command(msg, args = {}) ⇒ Object
107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 |
# File 'lib/core/net/client.rb', line 107 def command(msg, args = {}) return unless alive? @semaphore.synchronize { payload = { 'msg' => msg }.merge(args) @command_queue << JSON.generate(payload) result = nil start = Time.now thread = Thread.new do result = @result_queue.pop end sleep 0.1 until result || (Time.now - start) > @timeout thread.exit result ||= {} result.keys.length == 1 ? result['response'] : result } end |
#command_to_file(file, msg, args = {}) ⇒ Object
127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 |
# File 'lib/core/net/client.rb', line 127 def command_to_file(file, msg, args = {}) return unless alive? Zashoku.logger.info('stopping client threads from command_to_file') stop_threads @semaphore.synchronize { payload = { 'msg' => msg, 'raw' => true }.merge(args) @socket.puts(JSON.generate(payload)) f = File.open(file, 'w') Zashoku::Util.alert("opened file #{file}") f.sync = true Zashoku::Util.alert('waiting for server...') l = @socket.gets(Zashoku::EOF).split(Zashoku::EOF).first.to_i Zashoku::Util.alert("got length #{l}") r = 0 loop do d = @socket.gets(Zashoku::EOF, BSIZE) dl = d.length f.print(d) Zashoku::Util.alert( "file #{((r += dl) / Float(l) * 100).round}% buffered" ) break if dl < BSIZE end f.close } Zashoku::Util.alert('file buffered') Zashoku.logger.info('re-starting client threads from command_to_file') start_threads end |
#connect(host, port, loud_fail: true) ⇒ Object
76 77 78 79 80 81 82 83 84 85 |
# File 'lib/core/net/client.rb', line 76 def connect(host, port, loud_fail: true) TCPSocket.new(host, port) rescue Errno::ECONNREFUSED if loud_fail puts "error: could not connect to #{host}:#{port}" Thread.exit end @alive = false nil end |
#disconnect ⇒ Object
99 100 101 102 103 104 105 |
# File 'lib/core/net/client.rb', line 99 def disconnect command('disconnect') @command_thread.exit @results_thread.exit @events_thread.exit @alive = false end |
#dispatch_events! ⇒ Object
188 189 190 191 192 193 194 195 196 197 198 |
# File 'lib/core/net/client.rb', line 188 def dispatch_events! loop do event = @event_queue.pop callbacks.each do |callback| Thread.new do callback.call event end end end end |
#distribute_results! ⇒ Object
171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 |
# File 'lib/core/net/client.rb', line 171 def distribute_results! raw = @socket.readline response = JSON.parse(raw) if response['event'] @event_queue << response else @result_queue << response end rescue JSON::ParserError Zashoku.logger.error("discarding #{raw}, JSON::ParserError") rescue EOFError Zashoku.logger.error('eof error') lost_connection rescue IOError Util.alert("error: socket#{@socket}") end |
#get_items(_mod) ⇒ Object
200 201 202 203 |
# File 'lib/core/net/client.rb', line 200 def get_items(_mod) {} # Util.decode_object(send("items #{mod}")) end |
#lost_connection ⇒ Object
91 92 93 94 95 96 97 |
# File 'lib/core/net/client.rb', line 91 def lost_connection @alive = false @socket.close Zashoku.logger.warn('lost connection') Util.alert('no connection') stop_threads end |
#pump_commands! ⇒ Object
155 156 157 158 159 160 161 162 163 |
# File 'lib/core/net/client.rb', line 155 def pump_commands! loop do begin @socket.puts(@command_queue.pop) rescue lost_connection end end end |
#pump_results! ⇒ Object
165 166 167 168 169 |
# File 'lib/core/net/client.rb', line 165 def pump_results! loop do distribute_results! end end |
#say_helo ⇒ Object
34 35 36 37 38 39 40 |
# File 'lib/core/net/client.rb', line 34 def say_helo r = command( 'helo', {zv: Zashoku::Version, app: Zashoku::CConf[:app][:name]} ) Zashoku.logger.warn('Everything is not okay') unless r['response'] == 'ok' end |
#start_threads ⇒ Object
42 43 44 45 46 |
# File 'lib/core/net/client.rb', line 42 def start_threads @command_thread = Thread.new { pump_commands! } @results_thread = Thread.new { pump_results! } @events_thread = Thread.new { dispatch_events! } end |
#stop_threads ⇒ Object
48 49 50 51 52 |
# File 'lib/core/net/client.rb', line 48 def stop_threads @command_thread&.exit @results_thread&.exit @events_thread&.exit end |