Class: Baykit::BayServer::Agent::NonBlockingHandler

Inherits:
Object
  • Object
show all
Includes:
Baykit::BayServer::Agent, Util
Defined in:
lib/baykit/bayserver/agent/non_blocking_handler.rb

Overview

Channel handler

Sockets or file descriptors are kinds of channel

Defined Under Namespace

Classes: ChannelOperation, ChannelState

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(ship_agent) ⇒ NonBlockingHandler

Returns a new instance of NonBlockingHandler.



75
76
77
78
79
80
81
# File 'lib/baykit/bayserver/agent/non_blocking_handler.rb', line 75

def initialize(ship_agent)
  @agent = ship_agent
  @ch_map = {}
  @ch_count = 0
  @operations = []
  @operations_lock = Monitor.new()
end

Instance Attribute Details

#agentObject (readonly)

Returns the value of attribute agent.



69
70
71
# File 'lib/baykit/bayserver/agent/non_blocking_handler.rb', line 69

def agent
  @agent
end

#ch_countObject (readonly)

Returns the value of attribute ch_count.



71
72
73
# File 'lib/baykit/bayserver/agent/non_blocking_handler.rb', line 71

def ch_count
  @ch_count
end

#ch_mapObject (readonly)

Returns the value of attribute ch_map.



70
71
72
# File 'lib/baykit/bayserver/agent/non_blocking_handler.rb', line 70

def ch_map
  @ch_map
end

#operationsObject (readonly)

Returns the value of attribute operations.



72
73
74
# File 'lib/baykit/bayserver/agent/non_blocking_handler.rb', line 72

def operations
  @operations
end

#operations_lockObject (readonly)

Returns the value of attribute operations_lock.



73
74
75
# File 'lib/baykit/bayserver/agent/non_blocking_handler.rb', line 73

def operations_lock
  @operations_lock
end

Instance Method Details

#add_channel_listener(ch, lis) ⇒ Object



269
270
271
272
273
274
# File 'lib/baykit/bayserver/agent/non_blocking_handler.rb', line 269

def add_channel_listener(ch, lis)
  ch_state = ChannelState.new(ch, lis)
  add_channel_state(ch, ch_state)
  ch_state.access()
  return ch_state
end

#ask_to_close(ch) ⇒ Object



332
333
334
335
336
337
338
339
340
341
342
343
344
345
# File 'lib/baykit/bayserver/agent/non_blocking_handler.rb', line 332

def ask_to_close(ch)
  ch_state = find_channel_state(ch)
  BayLog.debug("%s askToClose chState=%s", @agent, ch_state);

  if ch_state == nil
    BayLog.warn("%s channel state not found: %s", @agent, ch)
    return
  end

  ch_state.closing = true
  add_operation(ch, Selector::OP_WRITE, false, true)

  ch_state.access
end

#ask_to_connect(ch, addr) ⇒ Object



284
285
286
287
288
289
290
291
292
293
294
295
296
# File 'lib/baykit/bayserver/agent/non_blocking_handler.rb', line 284

def ask_to_connect(ch, addr)
  ch_state = find_channel_state(ch)
  BayLog.debug("%s askToConnect addr=%s skt=%s chState=%s", @agent, addr, ch, ch_state)

  begin
    ch.connect_nonblock(addr)
  rescue IO::WaitWritable => e
    #BayLog.error_e(e)
  end

  ch_state.connecting = true
  add_operation(ch, Selector::OP_READ, true)
end

#ask_to_read(ch) ⇒ Object



298
299
300
301
302
303
304
305
306
307
308
309
310
311
# File 'lib/baykit/bayserver/agent/non_blocking_handler.rb', line 298

def ask_to_read(ch)
  ch_state = find_channel_state(ch)
  BayLog.debug("%s askToRead chState=%s", @agent, ch_state);

  if ch.closed?
    raise IOError.new("Channel is closed")
  end

  add_operation(ch, Selector::OP_READ)

  if ch_state != nil
    ch_state.access()
  end
end

#ask_to_start(ch) ⇒ Object



276
277
278
279
280
281
282
# File 'lib/baykit/bayserver/agent/non_blocking_handler.rb', line 276

def ask_to_start(ch)
  BayLog.debug("%s askToStart: ch=%s", @agent, ch)

  ch_state = find_channel_state(ch)
  ch_state.accepted = true

end

#ask_to_write(ch) ⇒ Object



313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
# File 'lib/baykit/bayserver/agent/non_blocking_handler.rb', line 313

def ask_to_write(ch)
  ch_state = find_channel_state(ch)
  BayLog.debug("%s askToWrite chState=%s", @agent, ch_state);

  if ch.closed?
    BayLog.warn("%s Channel is closed: %s", @agent, ch)
    return
  end

  add_operation(ch, Selector::OP_WRITE)

  if ch_state == nil
    BayLog.error("Unknown socket (or closed)")
    return
  end

  ch_state.access()
end

#close_allObject



347
348
349
350
351
352
# File 'lib/baykit/bayserver/agent/non_blocking_handler.rb', line 347

def close_all()
  @ch_map.keys().each do |ch|
    st = find_channel_state(ch)
    close_channel(ch, st)
  end
end

#close_timeout_socketsObject



247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
# File 'lib/baykit/bayserver/agent/non_blocking_handler.rb', line 247

def close_timeout_sockets()
  if @ch_map.empty?
    return
  end

  close_list = []
  now = DateTime.now
  @ch_map.values.each do |ch_state|
    if ch_state.listener != nil
      duration =  ((now - ch_state.last_access_time) * 86400).to_i
      if ch_state.listener.check_timeout(ch_state.channel, duration)
        BayLog.debug("%s timeout: skt=%s", @agent, ch_state.channel)
        close_list << ch_state
      end
    end
  end

  close_list.each do |ch_state|
    close_channel ch_state.channel, ch_state
  end
end

#handle_channel(ch, op) ⇒ Object



88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
# File 'lib/baykit/bayserver/agent/non_blocking_handler.rb', line 88

def handle_channel(ch, op)

  ch_state = find_channel_state(ch)
  if ch_state == nil
    BayLog.error("Cannot find fd state (Maybe file is closed)")
    @agent.selector.unregister(ch)
    return
  end

  next_action = nil
  begin

    if ch_state.closing
      next_action = NextSocketAction::CLOSE

    elsif ch_state.connecting
      ch_state.connecting = false
      # connectable
      next_action = ch_state.listener.on_connectable(ch)
      if next_action == nil
        raise Sink.new("unknown next action")
      elsif next_action == NextSocketAction::CONTINUE
        ask_to_read(ch)
      end

    else
      if op & Selector::OP_READ != 0
        # readable
        next_action = ch_state.listener.on_readable(ch)
        if next_action == nil
          raise Sink.new("unknown next action")
        elsif next_action == NextSocketAction::WRITE
          op = @agent.selector.get_op(ch)
          op = op | Selector::OP_WRITE
          @agent.selector.modify(ch, op)
        end
      end

      if (next_action != NextSocketAction::CLOSE) && (op & Selector::OP_WRITE != 0)
        # writable
        next_action = ch_state.listener.on_writable(ch)
        if next_action == nil
          raise Sink.new("unknown next action")
        elsif next_action == NextSocketAction::READ
          # Handle as "Write Off"
          op = @agent.selector.get_op(ch)
          op = op & ~Selector::OP_WRITE
          if op == 0
            @agent.selector.unregister(ch)
          else
            @agent.selector.modify(ch, op)
          end
        end
      end
    end


    if next_action == nil
      raise Sink.new("unknown next action")
    end

  rescue Sink => e
    raise e

  rescue => e
    if e.kind_of? EOFError
      BayLog.debug("%s Socket closed by peer: skt=%s", @agent, ch.inspect)
    elsif e.kind_of? SystemCallError
      BayLog.debug("%s O/S error: %s (skt=%s)", @agent, e.message, ch.inspect)
    elsif e.kind_of? IOError
      BayLog.debug("%s IO error: %s (skt=%s)", @agent, e.message, ch.inspect)
    elsif e.kind_of? OpenSSL::SSL::SSLError
      BayLog.debug("%s SSL error: %s (skt=%s)", @agent, e.message, ch.inspect)
    else
      BayLog.error("%s Unhandled error error: %s (skt=%s)", @agent, e, ch.inspect)
      throw e
    end
    # Cannot handle Exception any more
    ch_state.listener.on_error(ch, e)
    next_action = NextSocketAction::CLOSE
  end

  cancel = false
  ch_state.access()
  BayLog.trace("%s next=%d", ch_state, next_action)
  case next_action
  when NextSocketAction::CLOSE
    close_channel(ch, ch_state)
    cancel = false   # already canceled in close_channel method

  when NextSocketAction::SUSPEND
    cancel = true

  when NextSocketAction::CONTINUE, NextSocketAction::READ, NextSocketAction::WRITE
    # do nothing

  else
    raise RuntimeError.new("IllegalState:: #{next_action}")
  end

  if cancel
    BayLog.trace("%s cancel key chState=%s", @agent, ch_state)
    @agent.selector.unregister(ch)
  end
end

#register_channel_opsObject



194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
# File 'lib/baykit/bayserver/agent/non_blocking_handler.rb', line 194

def register_channel_ops()
  if @operations.empty?
    return 0
  end

  @operations_lock.synchronize do
    nch = @operations.length
    @operations.each do |ch_op|
      st = self.find_channel_state(ch_op.ch)
      if ch_op.ch.closed?
        # Channel is closed before register operation
        BayLog.debug("%s Try to register closed socket (Ignore)", @agent)
        next
      end

      begin
        BayLog.trace("%s register op=%s chState=%s", @agent, self.class.op_mode(ch_op.op), st)
        op = @agent.selector.get_op(ch_op.ch)
        if op == nil
          @agent.selector.register(ch_op.ch, ch_op.op)
        else
          new_op = op | ch_op.op
          BayLog.debug("%s Already registered ch=%s op=%s update to %s", @agent, ch_op.ch, self.class.op_mode(op), self.class.op_mode(new_op))
          @agent.selector.modify(ch_op.ch, new_op)
        end

        if ch_op.to_connect
          if st == nil
            BayLog.warn("%s register connect but ChannelState is null", @agent);
          else
            st.connecting = true
          end

        elsif ch_op.to_close
          if st == nil
            BayLog.warn("%s chState=%s register close but ChannelState", self.agent);
          else
            st.closing = true
          end
        end

      rescue => e
        cst = find_channel_state(ch_op.ch)
        BayLog.error_e(e, "%s Cannot register operation: %s", self.agent, cst != nil ? cst.listener : nil)
      end
    end

    @operations.clear()
    return nch

  end
end

#to_sObject



84
85
86
# File 'lib/baykit/bayserver/agent/non_blocking_handler.rb', line 84

def to_s()
  return @agent.to_s()
end