Class: Baykit::BayServer::Agent::Multiplexer::MultiplexerBase

Inherits:
Object
  • Object
show all
Includes:
Baykit::BayServer, Common::Multiplexer
Defined in:
lib/baykit/bayserver/agent/multiplexer/multiplexer_base.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from Common::Multiplexer

#cancel_read, #cancel_write, #is_non_blocking, #next_accept, #next_read, #next_write, #on_busy, #on_free, #req_accept, #req_close, #req_connect, #req_end, #req_read, #req_write, #shutdown, #use_async_api

Constructor Details

#initialize(agt) ⇒ MultiplexerBase

Returns a new instance of MultiplexerBase.



18
19
20
21
22
23
24
# File 'lib/baykit/bayserver/agent/multiplexer/multiplexer_base.rb', line 18

def initialize(agt)
  @agent = agt
  @channel_count = 0
  @rudders = {}
  @rudders_lock = Mutex::new
  @lock = Mutex::new
end

Instance Attribute Details

#agentObject (readonly)

Returns the value of attribute agent.



13
14
15
# File 'lib/baykit/bayserver/agent/multiplexer/multiplexer_base.rb', line 13

def agent
  @agent
end

#channel_countObject (readonly)

Returns the value of attribute channel_count.



12
13
14
# File 'lib/baykit/bayserver/agent/multiplexer/multiplexer_base.rb', line 12

def channel_count
  @channel_count
end

#lockObject (readonly)

Returns the value of attribute lock.



16
17
18
# File 'lib/baykit/bayserver/agent/multiplexer/multiplexer_base.rb', line 16

def lock
  @lock
end

#ruddersObject (readonly)

Returns the value of attribute rudders.



14
15
16
# File 'lib/baykit/bayserver/agent/multiplexer/multiplexer_base.rb', line 14

def rudders
  @rudders
end

#rudders_lockObject (readonly)

Returns the value of attribute rudders_lock.



15
16
17
# File 'lib/baykit/bayserver/agent/multiplexer/multiplexer_base.rb', line 15

def rudders_lock
  @rudders_lock
end

Instance Method Details

#add_rudder_state(rd, st) ⇒ Object

Implements Multiplexer



30
31
32
33
34
35
36
37
# File 'lib/baykit/bayserver/agent/multiplexer/multiplexer_base.rb', line 30

def add_rudder_state(rd, st)
  st.multiplexer = self
  @rudders_lock.synchronize do
    @rudders[rd.key] = st
  end
  @channel_count += 1
  st.access()
end

#close_allObject



119
120
121
122
123
124
125
126
127
128
129
# File 'lib/baykit/bayserver/agent/multiplexer/multiplexer_base.rb', line 119

def close_all()
  copied = nil
  @rudders_lock.synchronize do
    copied = @rudders.values
  end
  copied.each do |st|
    if st.rudder != @agent.command_receiver.rudder
      close_rudder(st)
    end
  end
end

#close_rudder(st) ⇒ Object



66
67
68
69
70
71
72
73
74
75
# File 'lib/baykit/bayserver/agent/multiplexer/multiplexer_base.rb', line 66

def close_rudder(st)
  BayLog.debug("%s closeRd %s state=%s closed=%s", agent, st.rudder, st, st.closed)

  begin
    st.rudder.close()
  rescue IOError => e
    Baylog.error_e(e)
  end

end

#close_timeout_socketsObject



92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
# File 'lib/baykit/bayserver/agent/multiplexer/multiplexer_base.rb', line 92

def close_timeout_sockets
  if @rudders.empty?
    return
  end

  close_list = []
  copied = nil
  @rudders_lock.synchronize do
    copied = @rudders.values
  end
  now = Time.now.tv_sec

  copied.each do |st|
    if st.transporter != nil
      duration =  now - st.last_access_time
      if st.transporter.check_timeout(st.rudder, duration)
        BayLog.debug("%s timeout: ch=%s", @agent, st.rudder)
        close_list << st
      end
    end
  end

  close_list.each do |st|
    req_close(st.rudder)
  end
end

#consume_oldest_unit(st) ⇒ Object



54
55
56
57
58
59
60
61
62
63
64
# File 'lib/baykit/bayserver/agent/multiplexer/multiplexer_base.rb', line 54

def consume_oldest_unit(st)
  u = nil
  st.write_queue_lock.synchronize do
    if st.write_queue.empty?
      return false
    end
    u = st.write_queue.shift()
  end
  u.done()
  return true
end

#find_rudder_state_by_key(key) ⇒ Object

Custom methods



86
87
88
89
90
# File 'lib/baykit/bayserver/agent/multiplexer/multiplexer_base.rb', line 86

def find_rudder_state_by_key(key)
  @rudders_lock.synchronize do
    return @rudders[key]
  end
end

#get_rudder_state(rd) ⇒ Object



46
47
48
# File 'lib/baykit/bayserver/agent/multiplexer/multiplexer_base.rb', line 46

def get_rudder_state(rd)
  return find_rudder_state_by_key(rd.key)
end

#get_transporter(rd) ⇒ Object



50
51
52
# File 'lib/baykit/bayserver/agent/multiplexer/multiplexer_base.rb', line 50

def get_transporter(rd)
  return get_rudder_state(rd).transporter
end

#is_busyObject



77
78
79
# File 'lib/baykit/bayserver/agent/multiplexer/multiplexer_base.rb', line 77

def is_busy()
  return @channel_count >= @agent.max_inbound_ships
end

#remove_rudder_state(rd) ⇒ Object



39
40
41
42
43
44
# File 'lib/baykit/bayserver/agent/multiplexer/multiplexer_base.rb', line 39

def remove_rudder_state(rd)
  @rudders_lock.synchronize do
    @rudders.delete(rd.key())
  end
  @channel_count -= 1
end