Class: Zashoku::Net::Client
- Inherits:
-
Object
- Object
- Zashoku::Net::Client
- Defined in:
- lib/core/net/client.rb
Instance Attribute Summary collapse
-
#callbacks ⇒ Object
Returns the value of attribute callbacks.
Class Method Summary collapse
Instance Method Summary collapse
- #alive? ⇒ Boolean
- #command(msg, args = {}) ⇒ Object
- #connect(host, port, loud_fail: true) ⇒ Object
- #disconnect ⇒ Object
- #dispatch_events! ⇒ Object
- #distribute_results! ⇒ Object
- #get_items(_mod) ⇒ Object
-
#initialize(port, host = Zashoku::CConf[:app][:net][:host]) ⇒ Client
constructor
A new instance of Client.
- #lost_connection ⇒ Object
- #pump_commands! ⇒ Object
- #pump_results! ⇒ Object
- #start_threads ⇒ Object
- #stop_threads ⇒ Object
Constructor Details
#initialize(port, host = Zashoku::CConf[:app][:net][:host]) ⇒ Client
Returns a new instance of Client.
12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 |
# File 'lib/core/net/client.rb', line 12 def initialize(port, host = Zashoku::CConf[:app][:net][:host]) @host, @port = host, port @socket = connect(@host, @port) @alive = true @timeout = 2 @callbacks = [] @command_queue = Queue.new @result_queue = Queue.new @event_queue = Queue.new start_threads @semaphore = Mutex.new end |
Instance Attribute Details
#callbacks ⇒ Object
Returns the value of attribute callbacks.
10 11 12 |
# File 'lib/core/net/client.rb', line 10 def callbacks @callbacks end |
Class Method Details
.command(host, port, msg, args = {}) ⇒ Object
43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 |
# File 'lib/core/net/client.rb', line 43 def self.command(host, port, msg, args = {}) sock = TCPSocket.new(host, port) sock.puts(JSON.generate({'msg' => msg}.merge(args))) result = sock.readline sock.close { payload: JSON.parse(result).map { |k, v| "#{k}: #{v}\n" }.join, status: true } rescue Errno::ECONNREFUSED { payload: "error: could not connect connect to #{host}:#{port}\n", status: false } rescue EOFError { payload: nil, status: false } end |
Instance Method Details
#alive? ⇒ Boolean
75 76 77 |
# File 'lib/core/net/client.rb', line 75 def alive? @alive end |
#command(msg, args = {}) ⇒ Object
95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 |
# File 'lib/core/net/client.rb', line 95 def command(msg, args = {}) return unless alive? @semaphore.synchronize { payload = { 'msg' => msg }.merge(args) @command_queue << JSON.generate(payload) result = nil start = Time.now thread = Thread.new do result = @result_queue.pop end sleep 0.1 until result || (Time.now - start) > @timeout thread.exit result ||= {} result.keys.length == 1 ? result['response'] : result } end |
#connect(host, port, loud_fail: true) ⇒ Object
64 65 66 67 68 69 70 71 72 73 |
# File 'lib/core/net/client.rb', line 64 def connect(host, port, loud_fail: true) TCPSocket.new(host, port) rescue Errno::ECONNREFUSED if loud_fail puts "error: could not connect to #{host}:#{port}" Thread.exit end @alive = false nil end |
#disconnect ⇒ Object
87 88 89 90 91 92 93 |
# File 'lib/core/net/client.rb', line 87 def disconnect command('disconnect') @command_thread.exit @results_thread.exit @events_thread.exit @alive = false end |
#dispatch_events! ⇒ Object
150 151 152 153 154 155 156 157 158 159 160 |
# File 'lib/core/net/client.rb', line 150 def dispatch_events! loop do event = @event_queue.pop callbacks.each do |callback| Thread.new do callback.call event end end end end |
#distribute_results! ⇒ Object
131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 |
# File 'lib/core/net/client.rb', line 131 def distribute_results! raw = @socket.readline response = JSON.parse(raw) if response['event'] @event_queue << response else @result_queue << response end rescue JSON::ParserError Zashoku.logger.error("discarding #{raw}, JSON::ParserError") rescue EOFError Zashoku.logger.error('eof error') lost_connection rescue IOError Util.alert("error: socket#{@socket}") #binding.pry #exit end |
#get_items(_mod) ⇒ Object
162 163 164 165 |
# File 'lib/core/net/client.rb', line 162 def get_items(_mod) {} # Util.decode_object(send("items #{mod}")) end |
#lost_connection ⇒ Object
79 80 81 82 83 84 85 |
# File 'lib/core/net/client.rb', line 79 def lost_connection @alive = false @socket.close Zashoku.logger.warn('lost connection') Util.alert('no connection') stop_threads end |
#pump_commands! ⇒ Object
115 116 117 118 119 120 121 122 123 |
# File 'lib/core/net/client.rb', line 115 def pump_commands! loop do begin @socket.puts(@command_queue.pop) rescue lost_connection end end end |
#pump_results! ⇒ Object
125 126 127 128 129 |
# File 'lib/core/net/client.rb', line 125 def pump_results! loop do distribute_results! end end |
#start_threads ⇒ Object
30 31 32 33 34 |
# File 'lib/core/net/client.rb', line 30 def start_threads @command_thread = Thread.new { pump_commands! } @results_thread = Thread.new { pump_results! } @events_thread = Thread.new { dispatch_events! } end |
#stop_threads ⇒ Object
36 37 38 39 40 |
# File 'lib/core/net/client.rb', line 36 def stop_threads @command_thread&.exit @results_thread&.exit @events_thread&.exit end |