Class: Baykit::BayServer::Agent::Multiplexer::MultiplexerBase
- Inherits:
-
Object
- Object
- Baykit::BayServer::Agent::Multiplexer::MultiplexerBase
- Includes:
- Baykit::BayServer, Common::Multiplexer
- Defined in:
- lib/baykit/bayserver/agent/multiplexer/multiplexer_base.rb
Direct Known Subclasses
JobMultiplexerBase, SpiderMultiplexer, SpinMultiplexer, TaxiMultiplexer
Instance Attribute Summary collapse
-
#agent ⇒ Object
readonly
Returns the value of attribute agent.
-
#channel_count ⇒ Object
readonly
Returns the value of attribute channel_count.
-
#lock ⇒ Object
readonly
Returns the value of attribute lock.
-
#rudders ⇒ Object
readonly
Returns the value of attribute rudders.
-
#rudders_lock ⇒ Object
readonly
Returns the value of attribute rudders_lock.
Instance Method Summary collapse
-
#add_rudder_state(rd, st) ⇒ Object
Implements Multiplexer.
- #close_all ⇒ Object
- #close_rudder(st) ⇒ Object
- #close_timeout_sockets ⇒ Object
- #consume_oldest_unit(st) ⇒ Object
-
#find_rudder_state_by_key(key) ⇒ Object
Custom methods.
- #get_rudder_state(rd) ⇒ Object
- #get_transporter(rd) ⇒ Object
-
#initialize(agt) ⇒ MultiplexerBase
constructor
A new instance of MultiplexerBase.
- #is_busy ⇒ Object
- #remove_rudder_state(rd) ⇒ Object
Methods included from Common::Multiplexer
#cancel_read, #cancel_write, #is_non_blocking, #next_accept, #next_read, #next_write, #on_busy, #on_free, #req_accept, #req_close, #req_connect, #req_end, #req_read, #req_write, #shutdown, #use_async_api
Constructor Details
#initialize(agt) ⇒ MultiplexerBase
Returns a new instance of MultiplexerBase.
18 19 20 21 22 23 24 |
# File 'lib/baykit/bayserver/agent/multiplexer/multiplexer_base.rb', line 18 def initialize(agt) @agent = agt @channel_count = 0 @rudders = {} @rudders_lock = Mutex::new @lock = Mutex::new end |
Instance Attribute Details
#agent ⇒ Object (readonly)
Returns the value of attribute agent.
13 14 15 |
# File 'lib/baykit/bayserver/agent/multiplexer/multiplexer_base.rb', line 13 def agent @agent end |
#channel_count ⇒ Object (readonly)
Returns the value of attribute channel_count.
12 13 14 |
# File 'lib/baykit/bayserver/agent/multiplexer/multiplexer_base.rb', line 12 def channel_count @channel_count end |
#lock ⇒ Object (readonly)
Returns the value of attribute lock.
16 17 18 |
# File 'lib/baykit/bayserver/agent/multiplexer/multiplexer_base.rb', line 16 def lock @lock end |
#rudders ⇒ Object (readonly)
Returns the value of attribute rudders.
14 15 16 |
# File 'lib/baykit/bayserver/agent/multiplexer/multiplexer_base.rb', line 14 def rudders @rudders end |
#rudders_lock ⇒ Object (readonly)
Returns the value of attribute rudders_lock.
15 16 17 |
# File 'lib/baykit/bayserver/agent/multiplexer/multiplexer_base.rb', line 15 def rudders_lock @rudders_lock end |
Instance Method Details
#add_rudder_state(rd, st) ⇒ Object
Implements Multiplexer
30 31 32 33 34 35 36 37 |
# File 'lib/baykit/bayserver/agent/multiplexer/multiplexer_base.rb', line 30 def add_rudder_state(rd, st) st.multiplexer = self @rudders_lock.synchronize do @rudders[rd.key] = st end @channel_count += 1 st.access() end |
#close_all ⇒ Object
119 120 121 122 123 124 125 126 127 128 129 |
# File 'lib/baykit/bayserver/agent/multiplexer/multiplexer_base.rb', line 119 def close_all() copied = nil @rudders_lock.synchronize do copied = @rudders.values end copied.each do |st| if st.rudder != @agent.command_receiver.rudder close_rudder(st) end end end |
#close_rudder(st) ⇒ Object
66 67 68 69 70 71 72 73 74 75 |
# File 'lib/baykit/bayserver/agent/multiplexer/multiplexer_base.rb', line 66 def close_rudder(st) BayLog.debug("%s closeRd %s state=%s closed=%s", agent, st.rudder, st, st.closed) begin st.rudder.close() rescue IOError => e Baylog.error_e(e) end end |
#close_timeout_sockets ⇒ Object
92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 |
# File 'lib/baykit/bayserver/agent/multiplexer/multiplexer_base.rb', line 92 def close_timeout_sockets if @rudders.empty? return end close_list = [] copied = nil @rudders_lock.synchronize do copied = @rudders.values end now = Time.now.tv_sec copied.each do |st| if st.transporter != nil duration = now - st.last_access_time if st.transporter.check_timeout(st.rudder, duration) BayLog.debug("%s timeout: ch=%s", @agent, st.rudder) close_list << st end end end close_list.each do |st| req_close(st.rudder) end end |
#consume_oldest_unit(st) ⇒ Object
54 55 56 57 58 59 60 61 62 63 64 |
# File 'lib/baykit/bayserver/agent/multiplexer/multiplexer_base.rb', line 54 def consume_oldest_unit(st) u = nil st.write_queue_lock.synchronize do if st.write_queue.empty? return false end u = st.write_queue.shift() end u.done() return true end |
#find_rudder_state_by_key(key) ⇒ Object
Custom methods
86 87 88 89 90 |
# File 'lib/baykit/bayserver/agent/multiplexer/multiplexer_base.rb', line 86 def find_rudder_state_by_key(key) @rudders_lock.synchronize do return @rudders[key] end end |
#get_rudder_state(rd) ⇒ Object
46 47 48 |
# File 'lib/baykit/bayserver/agent/multiplexer/multiplexer_base.rb', line 46 def get_rudder_state(rd) return find_rudder_state_by_key(rd.key) end |
#get_transporter(rd) ⇒ Object
50 51 52 |
# File 'lib/baykit/bayserver/agent/multiplexer/multiplexer_base.rb', line 50 def get_transporter(rd) return get_rudder_state(rd).transporter end |
#is_busy ⇒ Object
77 78 79 |
# File 'lib/baykit/bayserver/agent/multiplexer/multiplexer_base.rb', line 77 def is_busy() return @channel_count >= @agent.max_inbound_ships end |
#remove_rudder_state(rd) ⇒ Object
39 40 41 42 43 44 |
# File 'lib/baykit/bayserver/agent/multiplexer/multiplexer_base.rb', line 39 def remove_rudder_state(rd) @rudders_lock.synchronize do @rudders.delete(rd.key()) end @channel_count -= 1 end |