Class: Baykit::BayServer::Agent::GrandAgent

Inherits:
Object
  • Object
show all
Includes:
Baykit::BayServer, Letters, Multiplexer, Signal, Docker, Taxi, Train, Util
Defined in:
lib/baykit/bayserver/agent/grand_agent.rb

Constant Summary collapse

SELECT_TIMEOUT_SEC =
10
CMD_OK =
0
CMD_CLOSE =
1
CMD_RELOAD_CERT =
2
CMD_MEM_USAGE =
3
CMD_SHUTDOWN =
4
CMD_ABORT =
5
CMD_CATCHUP =
6

Class Attribute Summary collapse

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(agent_id, max_ships, anchorable) ⇒ GrandAgent

Returns a new instance of GrandAgent.



87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
# File 'lib/baykit/bayserver/agent/grand_agent.rb', line 87

def initialize (agent_id, max_ships, anchorable)
  @agent_id = agent_id
  @max_inbound_ships = max_ships
  @anchorable = anchorable
  @timer_handlers = []
  @select_timeout_sec = SELECT_TIMEOUT_SEC
  @aborted = false
  @letter_queue = []
  @letter_queue_lock = Mutex.new
  @postpone_queue = []
  @postpone_queue_lock = Mutex.new

  @spider_multiplexer = SpiderMultiplexer.new(self, anchorable)
  @spin_multiplexer = SpinMultiplexer.new(self)
  @job_multiplexer = JobMultiplexer.new(self, anchorable)
  @taxi_multiplexer = TaxiMultiplexer.new(self)

  case BayServer.harbor.recipient
  when Harbor::RECIPIENT_TYPE_SPIDER
    @recipient = @spider_multiplexer

  when Harbor::RECIPIENT_TYPE_PIPE
    raise NotImplementedError.new
  end

  case BayServer.harbor.net_multiplexer
  when Harbor::MULTIPLEXER_TYPE_SPIDER
    @net_multiplexer = @spider_multiplexer

  when Harbor::MULTIPLEXER_TYPE_JOB
    @net_multiplexer = @job_multiplexer

  when Harbor::MULTIPLEXER_TYPE_PIGEON, Harbor::MULTIPLEXER_TYPE_SPIN,
       Harbor::MULTIPLEXER_TYPE_TAXI, Harbor::MULTIPLEXER_TYPE_TRAIN

    raise Sink.new("Multiplexer not supported: %s", Harbor.get_multiplexer_type_name(BayServer.harbor.net_multiplexer))
  end

  @last_timeout_check = 0
end

Class Attribute Details

.agent_countObject (readonly)

Returns the value of attribute agent_count.



72
73
74
# File 'lib/baykit/bayserver/agent/grand_agent.rb', line 72

def agent_count
  @agent_count
end

.agentsObject (readonly)

Returns the value of attribute agents.



70
71
72
# File 'lib/baykit/bayserver/agent/grand_agent.rb', line 70

def agents
  @agents
end

.listenersObject (readonly)

Returns the value of attribute listeners.



71
72
73
# File 'lib/baykit/bayserver/agent/grand_agent.rb', line 71

def listeners
  @listeners
end

.max_agent_idObject (readonly)

Returns the value of attribute max_agent_id.



74
75
76
# File 'lib/baykit/bayserver/agent/grand_agent.rb', line 74

def max_agent_id
  @max_agent_id
end

.max_shipsObject (readonly)

Returns the value of attribute max_ships.



73
74
75
# File 'lib/baykit/bayserver/agent/grand_agent.rb', line 73

def max_ships
  @max_ships
end

Instance Attribute Details

#abortedObject (readonly)

Returns the value of attribute aborted.



60
61
62
# File 'lib/baykit/bayserver/agent/grand_agent.rb', line 60

def aborted
  @aborted
end

#agent_idObject (readonly)

Returns the value of attribute agent_id.



46
47
48
# File 'lib/baykit/bayserver/agent/grand_agent.rb', line 46

def agent_id
  @agent_id
end

#anchorableObject (readonly)

Returns the value of attribute anchorable.



47
48
49
# File 'lib/baykit/bayserver/agent/grand_agent.rb', line 47

def anchorable
  @anchorable
end

#command_receiverObject (readonly)

Returns the value of attribute command_receiver.



61
62
63
# File 'lib/baykit/bayserver/agent/grand_agent.rb', line 61

def command_receiver
  @command_receiver
end

#job_multiplexerObject (readonly)

Returns the value of attribute job_multiplexer.



49
50
51
# File 'lib/baykit/bayserver/agent/grand_agent.rb', line 49

def job_multiplexer
  @job_multiplexer
end

#last_timeout_checkObject (readonly)

Returns the value of attribute last_timeout_check.



63
64
65
# File 'lib/baykit/bayserver/agent/grand_agent.rb', line 63

def last_timeout_check
  @last_timeout_check
end

#letter_queueObject (readonly)

Returns the value of attribute letter_queue.



64
65
66
# File 'lib/baykit/bayserver/agent/grand_agent.rb', line 64

def letter_queue
  @letter_queue
end

#letter_queue_lockObject (readonly)

Returns the value of attribute letter_queue_lock.



65
66
67
# File 'lib/baykit/bayserver/agent/grand_agent.rb', line 65

def letter_queue_lock
  @letter_queue_lock
end

#max_inbound_shipsObject (readonly)

Returns the value of attribute max_inbound_ships.



58
59
60
# File 'lib/baykit/bayserver/agent/grand_agent.rb', line 58

def max_inbound_ships
  @max_inbound_ships
end

#net_multiplexerObject (readonly)

Returns the value of attribute net_multiplexer.



48
49
50
# File 'lib/baykit/bayserver/agent/grand_agent.rb', line 48

def net_multiplexer
  @net_multiplexer
end

#postpone_queueObject (readonly)

Returns the value of attribute postpone_queue.



66
67
68
# File 'lib/baykit/bayserver/agent/grand_agent.rb', line 66

def postpone_queue
  @postpone_queue
end

#postpone_queue_lockObject (readonly)

Returns the value of attribute postpone_queue_lock.



67
68
69
# File 'lib/baykit/bayserver/agent/grand_agent.rb', line 67

def postpone_queue_lock
  @postpone_queue_lock
end

#recipientObject (readonly)

Returns the value of attribute recipient.



55
56
57
# File 'lib/baykit/bayserver/agent/grand_agent.rb', line 55

def recipient
  @recipient
end

#send_wakeup_pipeObject (readonly)

Returns the value of attribute send_wakeup_pipe.



57
58
59
# File 'lib/baykit/bayserver/agent/grand_agent.rb', line 57

def send_wakeup_pipe
  @send_wakeup_pipe
end

#spider_multiplexerObject (readonly)

Returns the value of attribute spider_multiplexer.



52
53
54
# File 'lib/baykit/bayserver/agent/grand_agent.rb', line 52

def spider_multiplexer
  @spider_multiplexer
end

#spin_multiplexerObject (readonly)

Returns the value of attribute spin_multiplexer.



51
52
53
# File 'lib/baykit/bayserver/agent/grand_agent.rb', line 51

def spin_multiplexer
  @spin_multiplexer
end

#taxi_multiplexerObject (readonly)

Returns the value of attribute taxi_multiplexer.



50
51
52
# File 'lib/baykit/bayserver/agent/grand_agent.rb', line 50

def taxi_multiplexer
  @taxi_multiplexer
end

#timer_handlersObject (readonly)

Returns the value of attribute timer_handlers.



62
63
64
# File 'lib/baykit/bayserver/agent/grand_agent.rb', line 62

def timer_handlers
  @timer_handlers
end

#unanchorable_transportersObject (readonly)

Returns the value of attribute unanchorable_transporters.



59
60
61
# File 'lib/baykit/bayserver/agent/grand_agent.rb', line 59

def unanchorable_transporters
  @unanchorable_transporters
end

Instance Method Details

#abortObject



339
340
341
# File 'lib/baykit/bayserver/agent/grand_agent.rb', line 339

def abort
  BayLog.fatal("%s abort", self)
end

#abort_agentObject



249
250
251
252
253
254
255
# File 'lib/baykit/bayserver/agent/grand_agent.rb', line 249

def abort_agent
  BayLog.info("%s abort", self)

  if BayServer.harbor.multi_core
    exit(1)
  end
end

#add_command_receiver(rd) ⇒ Object



285
286
287
288
289
290
291
# File 'lib/baykit/bayserver/agent/grand_agent.rb', line 285

def add_command_receiver(rd)
  @command_receiver = CommandReceiver.new()
  com_transporter = PlainTransporter.new(@net_multiplexer, @command_receiver, true, 8, false)
  @command_receiver.init(@agent_id, rd, com_transporter)
  @net_multiplexer.add_rudder_state(@command_receiver.rudder, RudderState.new(@command_receiver.rudder, com_transporter))
  BayLog.info("ComRec=%s", @command_receiver)
end

#add_postpone(p) ⇒ Object



356
357
358
359
360
# File 'lib/baykit/bayserver/agent/grand_agent.rb', line 356

def add_postpone(p)
  @postpone_queue_lock.synchronize do
    @postpone_queue << p
  end
end

#add_timer_handler(handler) ⇒ Object



277
278
279
# File 'lib/baykit/bayserver/agent/grand_agent.rb', line 277

def add_timer_handler(handler)
  @timer_handlers << handler
end

#catch_upObject



380
381
382
383
384
385
386
387
388
# File 'lib/baykit/bayserver/agent/grand_agent.rb', line 380

def catch_up
  BayLog.debug("%s catchUp", self)
  @postpone_queue_lock.synchronize do
    if not @postpone_queue.empty?
      r = @postpone_queue.shift
      r.run()
    end
  end
end

#count_postponeObject



362
363
364
# File 'lib/baykit/bayserver/agent/grand_agent.rb', line 362

def count_postpone
  return @postpone_queue.length
end

#inspectObject



134
135
136
# File 'lib/baykit/bayserver/agent/grand_agent.rb', line 134

def inspect
  return to_s
end


263
264
265
266
267
268
269
270
271
272
273
274
# File 'lib/baykit/bayserver/agent/grand_agent.rb', line 263

def print_usage
  # print memory usage
  BayLog.info("%s MemUsage", self)
  BayLog.info("  Ruby version: %s", RUBY_VERSION)
  memsize = ObjectSpace.memsize_of_all
  # formatted by comma
  msize_comma = memsize.to_s.reverse.gsub(/(\d{3})(?=\d)/, '\\1,').reverse.then do |str|
    str[0] == ',' ? str[1..-1] : str
  end
  BayLog.info("  Total object size: %s bytes", msize_comma)
  MemUsage.get(@agent_id).print_usage(1)
end

#reload_certObject



344
345
346
347
348
349
350
351
352
353
354
# File 'lib/baykit/bayserver/agent/grand_agent.rb', line 344

def reload_cert
  GrandAgent.anchorable_port_map.values().each do |port|
    if port.secure()
      begin
        port.secure_docker.reload_cert()
      rescue => e
        BayLog.error_e(e)
      end
    end
  end
end

#remove_timer_handler(handler) ⇒ Object



281
282
283
# File 'lib/baykit/bayserver/agent/grand_agent.rb', line 281

def remove_timer_handler(handler)
  @timer_handlers.delete(handler)
end

#req_catch_upObject



366
367
368
369
370
371
372
373
374
375
376
377
378
# File 'lib/baykit/bayserver/agent/grand_agent.rb', line 366

def req_catch_up
  BayLog.debug("%s Req catchUp", self)
  if count_postpone > 0
    catch_up
  else
    begin
      @command_receiver.send_command_to_monitor(self, CMD_CATCHUP, false)
    rescue IOError => e
      BayLog.error_e(e)
      abort
    end
  end
end

#req_shutdownObject



257
258
259
260
# File 'lib/baykit/bayserver/agent/grand_agent.rb', line 257

def req_shutdown
  @aborted = true
  @recipient.wakeup
end

#runObject



147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
# File 'lib/baykit/bayserver/agent/grand_agent.rb', line 147

def run
  BayLog.info(BayMessage.get(:MSG_RUNNING_GRAND_AGENT, self))

  if @net_multiplexer.is_non_blocking
    BayLog.info("rec=%s", @command_receiver)
    @command_receiver.rudder.set_non_blocking
  end

  @net_multiplexer.req_read(@command_receiver.rudder)

  if @anchorable
    # Adds server socket channel of anchorable ports
    BayServer.anchorable_port_map.keys.each do |rd|
      if @net_multiplexer.is_non_blocking
        rd.set_non_blocking
      end
      @net_multiplexer.add_rudder_state(rd, RudderState.new(rd))
    end
  end

  # Set up unanchorable channel
=begin
  for ch in GrandAgent.unanchorable_port_map.keys() do
    port_dkr = GrandAgent.unanchorable_port_map[ch]
    tp = port_dkr.new_transporter(self, ch)
    @unanchorable_transporters[ch] = tp
    @non_blocking_handler.add_channel_listener(ch, tp)
    @non_blocking_handler.ask_to_start(ch)
    if !@anchorable
      @non_blocking_handler.ask_to_read(ch)
    end
  end
=end


  busy = true
  begin
    while true

      test_busy = @net_multiplexer.is_busy
      if test_busy != busy
        busy = test_busy
        if busy
          @net_multiplexer.on_busy
        else
          @net_multiplexer.on_free
        end
      end

      if not @spin_multiplexer.is_empty
        # If "SpinHandler" is running, the select function does not block.
        received = @recipient.receive(false)
        @spin_multiplexer.process_data
      else
        received = @recipient.receive(true)
      end

      if @aborted
        BayLog.info("%s aborted by another thread", self)
        break;
      end

      if @spin_multiplexer.is_empty && @letter_queue.empty?
        # timed out
        # check per 10 seconds
        if Time.now.tv_sec - @last_timeout_check >= 10
          ring
        end
      end

      while !@letter_queue.empty?
        let = nil
        @letter_queue_lock.synchronize do
          let = @letter_queue.shift
        end

        case let
        when AcceptedLetter
          on_accepted(let)
        when ConnectedLetter
          on_connected(let)
        when ReadLetter
          on_read(let)
        when WroteLetter
          on_wrote(let)
        when ClosedLetter
          on_closed(let)
        when ErrorLetter
          on_error(let)
        end
      end
    end # while

  rescue Exception => e
    BayLog.fatal_e(e, "Uncaught Error: %s", e)
  ensure
    BayLog.info("%s end", self)
    shutdown
  end
end

#send_accepted_letter(st, client_rd, wakeup) ⇒ Object



293
294
295
# File 'lib/baykit/bayserver/agent/grand_agent.rb', line 293

def send_accepted_letter(st, client_rd, wakeup)
  send_letter(AcceptedLetter.new(st, client_rd), wakeup)
end

#send_closed_letter(st, wakeup) ⇒ Object



308
309
310
# File 'lib/baykit/bayserver/agent/grand_agent.rb', line 308

def send_closed_letter(st, wakeup)
  send_letter(ClosedLetter.new(st), wakeup)
end

#send_connected_letter(st, wakeup) ⇒ Object



297
298
299
# File 'lib/baykit/bayserver/agent/grand_agent.rb', line 297

def send_connected_letter(st, wakeup)
  send_letter(ConnectedLetter.new(st), wakeup)
end

#send_error_letter(st, err, wakeup) ⇒ Object



312
313
314
# File 'lib/baykit/bayserver/agent/grand_agent.rb', line 312

def send_error_letter(st, err, wakeup)
  send_letter(ErrorLetter.new(st, err), wakeup)
end

#send_read_letter(st, n, adr, wakeup) ⇒ Object



300
301
302
# File 'lib/baykit/bayserver/agent/grand_agent.rb', line 300

def send_read_letter(st, n, adr, wakeup)
  send_letter(ReadLetter.new(st, n, adr), wakeup)
end

#send_wrote_letter(st, n, wakeup) ⇒ Object



304
305
306
# File 'lib/baykit/bayserver/agent/grand_agent.rb', line 304

def send_wrote_letter(st, n, wakeup)
  send_letter(WroteLetter.new(st, n), wakeup)
end

#shutdownObject



316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
# File 'lib/baykit/bayserver/agent/grand_agent.rb', line 316

def shutdown
  BayLog.info("%s shutdown", self)
  if @aborted
    return
  end

  @aborted = true
  BayLog.debug("%s shutdown netMultiplexer", self)
  @net_multiplexer.shutdown()

  GrandAgent.listeners.each do |lis|
    lis.remove(@agent_id)
  end
  @command_receiver.end()
  GrandAgent.agents.delete(@agent_id)

  if BayServer.harbor.multi_core
    BayLog.debug("%s exit", self)
    exit(1)
  end
  @agent_id = -1
end

#startObject

Custom methods



141
142
143
144
145
# File 'lib/baykit/bayserver/agent/grand_agent.rb', line 141

def start
  Thread.new do
    run
  end
end

#to_sObject



129
130
131
# File 'lib/baykit/bayserver/agent/grand_agent.rb', line 129

def to_s()
  return "agt#" + @agent_id.to_s
end