Class: Baykit::BayServer::Agent::Multiplexer::JobMultiplexer

Inherits:
JobMultiplexerBase show all
Includes:
Baykit::BayServer::Agent::Multiplexer, TimerHandler, Common::Recipient, Rudders, Util
Defined in:
lib/baykit/bayserver/agent/multiplexer/job_multiplexer.rb

Instance Attribute Summary

Attributes inherited from JobMultiplexerBase

#anchorable, #pipe

Attributes inherited from MultiplexerBase

#agent, #channel_count, #lock, #rudders, #rudders_lock

Instance Method Summary collapse

Methods included from Common::Recipient

#receive, #wakeup

Methods included from TimerHandler

#on_timer

Methods inherited from JobMultiplexerBase

#on_free, #on_timer, #shutdown

Methods inherited from MultiplexerBase

#add_rudder_state, #close_all, #close_rudder, #close_timeout_sockets, #consume_oldest_unit, #find_rudder_state_by_key, #get_rudder_state, #get_transporter, #is_busy, #remove_rudder_state

Methods included from Common::Multiplexer

#add_rudder_state, #close_rudder, #consume_oldest_unit, #get_rudder_state, #get_transporter, #is_busy, #on_busy, #on_free, #remove_rudder_state, #req_end, #shutdown

Constructor Details

#initialize(agt, anchorable) ⇒ JobMultiplexer

Returns a new instance of JobMultiplexer.



19
20
21
# File 'lib/baykit/bayserver/agent/multiplexer/job_multiplexer.rb', line 19

def initialize(agt, anchorable)
  super
end

Instance Method Details

#cancel_read(st) ⇒ Object



167
168
169
# File 'lib/baykit/bayserver/agent/multiplexer/job_multiplexer.rb', line 167

def cancel_read(st)

end

#cancel_write(st) ⇒ Object



171
172
173
# File 'lib/baykit/bayserver/agent/multiplexer/job_multiplexer.rb', line 171

def cancel_write(st)

end

#is_non_blockingObject



248
249
250
# File 'lib/baykit/bayserver/agent/multiplexer/job_multiplexer.rb', line 248

def is_non_blocking()
  return false
end

#next_accept(st) ⇒ Object



175
176
177
# File 'lib/baykit/bayserver/agent/multiplexer/job_multiplexer.rb', line 175

def next_accept(st)
  req_accept(st.rudder)
end

#next_read(st) ⇒ Object



179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
# File 'lib/baykit/bayserver/agent/multiplexer/job_multiplexer.rb', line 179

def next_read(st)
  Thread.new do
    if st.closed
      #channel is already closed
      BayLog.debug("%s Rudder is already closed: rd=%s", @agent, st.rudder);
      next
    end

    begin
      if st.handshaking
        # Calls accept API for client socket
        st.rudder.io.accept
        st.handshaking = false

        BayLog.debug("%s Handshake done (rd=%s)", self, st.rudder)
        app_protocols = st.rudder.io.context.alpn_protocols

        # HELP ME
        #   This code does not work!
        #   We cannot get application protocol name
        proto = nil
        if app_protocols != nil && app_protocols.length > 0
          proto = app_protocols[0]
        end
      end

      BayLog.debug("%s Try to Read (rd=%s)", @agent, st.rudder)
      begin
        n = st.rudder.read(st.read_buf, st.buf_size)
      rescue EOFError => e
        n = 0
        st.read_buf.clear
      end

      @agent.send_read_letter(st, n, nil, true)

    rescue Exception => e
      @agent.send_error_letter(st, e, true)
    end
  end
end

#next_write(st) ⇒ Object



221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
# File 'lib/baykit/bayserver/agent/multiplexer/job_multiplexer.rb', line 221

def next_write(st)
  Thread.new do
    BayLog.debug("%s next write st=%s", @agent, st)

    if st == nil || st.closed
      BayLog.warn("%s Channel is closed: %s", @agent, st.rudder)
      next
    end

    u = st.write_queue[0]
    BayLog.debug("%s Try to write: pkt=%s buflen=%d closed=%s", self, u.tag, u.buf.length, st.closed);

    n = 0
    begin
      if !st.closed && u.buf.length > 0
        n = st.rudder.write(u.buf)
        u.buf.slice!(0, n)
      end
    rescue Exception => e
      @agent.send_error_letter(st, e, true)
      next
    end

    @agent.send_wrote_letter(st, n, true)
  end
end

#req_accept(rd) ⇒ Object

Implements Multiplexer



31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
# File 'lib/baykit/bayserver/agent/multiplexer/job_multiplexer.rb', line 31

def req_accept(rd)
  BayLog.debug("%s reqAccept isShutdown=%s", @agent, @agent.aborted)
  if @agent.aborted
    return
  end

  st = get_rudder_state(rd)

  Thread.new do
    begin
      if @agent.aborted
        next
      end

      begin
        client_skt, adr = rd.io.accept
      rescue Exception => e
        @agent.send_error_letter(st, e, true)
        next
      end

      BayLog.debug("%s Accepted skt=%s", @agent, client_skt)
      if agent.aborted
        BayLog.error("%s Agent is not alive (close)", @agent);
        client_skt.close
      else
        @agent.send_accepted_letter(st, IORudder.new(client_skt), true)
      end

    rescue Exception => e
      BayLog.fatal_e(e)
      @agent.shutdown
    end
  end

end

#req_close(rd) ⇒ Object



139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
# File 'lib/baykit/bayserver/agent/multiplexer/job_multiplexer.rb', line 139

def req_close(rd)
  st = get_rudder_state(rd)
  BayLog.debug("%s reqClose st=%s", @agent, st);

  if st == nil
    BayLog.warn("%s channel state not found: %s", @agent, rd)
    return
  end

  Thread.new do
    begin
      st = get_rudder_state(rd)
      if st == nil
        BayLog.debug("%s Rudder is already closed: rd=%s", @agent, rd)
        next
      end

      close_rudder(st)
      @agent.send_closed_letter(st, true)
    rescue Exception => e
      BayLog.fatal_e(e)
      @agent.shutdown
    end
  end

  st.access
end

#req_connect(rd, adr) ⇒ Object



69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
# File 'lib/baykit/bayserver/agent/multiplexer/job_multiplexer.rb', line 69

def req_connect(rd, adr)
  st = get_rudder_state(rd)
  BayLog.debug("%s reqConnect adr=%s rd=%s chState=%s", @agent, adr.canonname, rd, st)

  Thread.new do
    begin
      rd.io.connect(adr)
      BayLog.debug("%s Connected rd=%s", @agent, rd)
      @agent.send_connected_letter(st, false)
    rescue Exception => e
      @agent.send_error_letter(st, e, false)
      return
    end
  end

  st.connecting = true
end

#req_read(rd) ⇒ Object



87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
# File 'lib/baykit/bayserver/agent/multiplexer/job_multiplexer.rb', line 87

def req_read(rd)
  st = get_rudder_state(rd)
  if st == nil
    return
  end

  BayLog.debug("%s reqRead rd=%s state=%s", @agent, st.rudder, st);
  need_read = false
  st.reading_lock.synchronize do
    if !st.reading
      need_read = true
      st.reading = true
    end
  end

  if need_read
    next_read(st)
  end

  st.access
end

#req_write(rd, buf, adr, tag, &lis) ⇒ Object



109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
# File 'lib/baykit/bayserver/agent/multiplexer/job_multiplexer.rb', line 109

def req_write(rd, buf, adr, tag, &lis)
  st = get_rudder_state(rd)
  BayLog.debug("%s reqWrite st=%s", @agent, st)

  if st == nil || st.closed
    BayLog.warn("%s Channel is closed(callback immediately): %s", @agent, rd)
    lis.call()
    return
  end

  unt = WriteUnit.new(buf, adr, tag, &lis)
  st.write_queue_lock.synchronize do
    st.write_queue << unt
  end

  need_write = false
  st.writing_lock.synchronize do
    if !st.writing
      need_write = true
      st.writing = true
    end
  end

  if need_write
    next_write(st)
  end

  st.access
end

#to_sObject



22
23
24
# File 'lib/baykit/bayserver/agent/multiplexer/job_multiplexer.rb', line 22

def to_s
  return "JobMpx[#{@agent}]"
end

#use_async_apiObject



252
253
254
# File 'lib/baykit/bayserver/agent/multiplexer/job_multiplexer.rb', line 252

def use_async_api()
  return false
end