Class: Baykit::BayServer::Common::WarpShip

Inherits:
Ships::Ship
  • Object
show all
Includes:
Agent, Ships, Tours, Util
Defined in:
lib/baykit/bayserver/common/warp_ship.rb

Constant Summary

Constants inherited from Ships::Ship

Ships::Ship::INVALID_SHIP_ID, Ships::Ship::SHIP_ID_NOCHECK

Instance Attribute Summary collapse

Attributes inherited from Ships::Ship

#agent_id, #initialized, #keeping, #object_id, #rudder, #ship_id, #transporter

Instance Method Summary collapse

Methods inherited from Ships::Ship

#check_ship_id, #id, #init, #post_close, #resume_read

Constructor Details

#initializeWarpShip

Returns a new instance of WarpShip.



23
24
25
26
27
28
29
30
31
# File 'lib/baykit/bayserver/common/warp_ship.rb', line 23

def initialize()
  super
  @docker = nil
  @socket_timeout_sec = nil
  @tour_map = {}
  @lock = Mutex.new()
  @connected = false
  @cmd_buf = []
end

Instance Attribute Details

#cmd_bufObject (readonly)

Returns the value of attribute cmd_buf.



21
22
23
# File 'lib/baykit/bayserver/common/warp_ship.rb', line 21

def cmd_buf
  @cmd_buf
end

#connectedObject

Returns the value of attribute connected.



18
19
20
# File 'lib/baykit/bayserver/common/warp_ship.rb', line 18

def connected
  @connected
end

#dockerObject (readonly)

Returns the value of attribute docker.



15
16
17
# File 'lib/baykit/bayserver/common/warp_ship.rb', line 15

def docker
  @docker
end

#lockObject (readonly)

Returns the value of attribute lock.



20
21
22
# File 'lib/baykit/bayserver/common/warp_ship.rb', line 20

def lock
  @lock
end

#protocol_handlerObject (readonly)

Returns the value of attribute protocol_handler.



17
18
19
# File 'lib/baykit/bayserver/common/warp_ship.rb', line 17

def protocol_handler
  @protocol_handler
end

#socket_timeout_secObject (readonly)

Returns the value of attribute socket_timeout_sec.



19
20
21
# File 'lib/baykit/bayserver/common/warp_ship.rb', line 19

def socket_timeout_sec
  @socket_timeout_sec
end

#tour_mapObject (readonly)

Returns the value of attribute tour_map.



14
15
16
# File 'lib/baykit/bayserver/common/warp_ship.rb', line 14

def tour_map
  @tour_map
end

Instance Method Details

#abort(check_id) ⇒ Object



238
239
240
241
# File 'lib/baykit/bayserver/common/warp_ship.rb', line 238

def abort(check_id)
  check_ship_id(check_id)
  @transporter.req_close(@rudder)
end

#check_timeout(duration_sec) ⇒ Object



135
136
137
138
139
140
141
142
# File 'lib/baykit/bayserver/common/warp_ship.rb', line 135

def check_timeout(duration_sec)
  if is_timeout(duration_sec)
    notify_error_to_owner_tour(HttpStatus::GATEWAY_TIMEOUT, "#{self} server timeout")
    true
  else
    false
  end
end

#end_shipObject



234
235
236
# File 'lib/baykit/bayserver/common/warp_ship.rb', line 234

def end_ship()
  @docker.on_end_ship(self)
end

#end_warp_tour(tur, keep) ⇒ Object



173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
# File 'lib/baykit/bayserver/common/warp_ship.rb', line 173

def end_warp_tour(tur, keep)
  wdat = WarpData.get(tur)
  BayLog.debug("%s end warp tour: started=%s ended=%s", tur, wdat.started, wdat.ended)

  if(!@tour_map.include?(wdat.warp_id))
    raise Sink.new("%s WarpId not in tourMap: %d", tur, wdat.warp_id);
  else
    @tour_map.delete wdat.warp_id
  end

  if keep
    BayLog.debug("%s keep warp ship", self)
    @docker.keep(self)
  end
end

#flushObject



269
270
271
272
273
274
275
276
277
278
279
280
# File 'lib/baykit/bayserver/common/warp_ship.rb', line 269

def flush()
    @cmd_buf.each do | cmd_and_lis |
      cmd = cmd_and_lis[0]
      lis = cmd_and_lis[1]
      if cmd == nil
        lis.call()
      else
        @protocol_handler.post(cmd, &lis)
      end
    end
    @cmd_buf = []
end

#get_tour(warp_id, must = true) ⇒ Object



193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
# File 'lib/baykit/bayserver/common/warp_ship.rb', line 193

def get_tour(warp_id, must=true)
  pair = @tour_map[warp_id]
  if pair != nil
    tur = pair[1]
    tur.check_tour_id pair[0]
    if !WarpData.get(tur).ended
      return tur
    end
  end

  if must
    raise Sink.new("%s warp tour not found: id=%d", self, warp_id)
  else
    nil
  end
end

#init_warp(rd, agt_id, tp, dkr, proto_hnd) ⇒ Object



33
34
35
36
37
38
39
# File 'lib/baykit/bayserver/common/warp_ship.rb', line 33

def init_warp(rd, agt_id, tp, dkr, proto_hnd)
  init(agt_id, rd, tp)
  @docker = dkr
  @socket_timeout_sec = @docker.timeout_sec >= 0 ? @docker.timeout_sec : BayServer.harbor.socket_timeout_sec
  @protocol_handler = proto_hnd
  @protocol_handler.init(self)
end

#inspectObject



49
50
51
# File 'lib/baykit/bayserver/common/warp_ship.rb', line 49

def inspect
  to_s
end

#is_timeout(duration) ⇒ Object



243
244
245
246
247
248
249
250
251
252
253
254
255
256
# File 'lib/baykit/bayserver/common/warp_ship.rb', line 243

def is_timeout(duration)
  if @keeping
    # warp connection never timeout in keeping
    timeout = false
  elsif @socket_timeout_sec <= 0
    timeout = false
  else
    timeout = duration >= @socket_timeout_sec
  end

  BayLog.debug("%s Warp check timeout: dur=%d, timeout=%s, keeping=%s limit=%d",
               self, duration, timeout, @keeping, @socket_timeout_sec)
  return timeout
end

#notify_closeObject



129
130
131
132
133
# File 'lib/baykit/bayserver/common/warp_ship.rb', line 129

def notify_close
  BayLog.debug("%s notifyClose", self)
  notify_error_to_owner_tour(HttpStatus::SERVICE_UNAVAILABLE, "#{self} server closed")
  end_ship()
end

#notify_connectObject



75
76
77
78
79
80
81
82
83
# File 'lib/baykit/bayserver/common/warp_ship.rb', line 75

def notify_connect()
  @connected = true
  @tour_map.values.each do |pair|
    tur = pair[1]
    tur.check_tour_id pair[0]
    WarpData.get(tur).start
  end
  NextSocketAction::CONTINUE
end

#notify_eofObject



89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
# File 'lib/baykit/bayserver/common/warp_ship.rb', line 89

def notify_eof()
  BayLog.debug("%s EOF detected", self)

  if @tour_map.empty?
    BayLog.debug("%s No warp tours. only close", self)
    return NextSocketAction::CLOSE
  end

  @tour_map.each do |warp_id, pair|
    tur = pair[1]
    tur.check_tour_id pair[0]

    begin
      if !tur.res.header_sent
        BayLog.debug("%s Send ServiceUnavailable: tur=%s", self, tur)
        tur.res.send_error(Tour::TOUR_ID_NOCHECK, HttpStatus::SERVICE_UNAVAILABLE, "Server closed on reading headers")
      else
        # NOT treat EOF as Error
        BayLog.debug("%s EOF is not an error: tur=%s", self, tur)
        tur.res.end_res_content(Tour::TOUR_ID_NOCHECK)
      end
    rescue IOError => e
      BayLog::debug_e(e)
    end
  end

  @tour_map.clear()
  return NextSocketAction::CLOSE
end

#notify_error(e) ⇒ Object



119
120
121
# File 'lib/baykit/bayserver/common/warp_ship.rb', line 119

def notify_error(e)
  BayLog.error_e(e, "notify_error")
end

#notify_error_to_owner_tour(status, msg) ⇒ Object



214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
# File 'lib/baykit/bayserver/common/warp_ship.rb', line 214

def notify_error_to_owner_tour(status, msg)
  @lock.synchronize do
    @tour_map.keys.each do |warp_id|
      tur = get_tour(warp_id)
      BayLog.debug("%s send error to owner: %s running=%s", self, tur, tur.running?)
      if tur.running? || tur.reading?
        begin
          tur.res.send_error(Tour::TOUR_ID_NOCHECK, status, msg)
        rescue Exception => e
          BayLog.error_e(e)
        end
      else
        tur.res.end_res_content(Tour::TOUR_ID_NOCHECK)
      end
    end
    @tour_map.clear
  end
end

#notify_handshake_done(proto) ⇒ Object

Implements Ship



70
71
72
73
# File 'lib/baykit/bayserver/common/warp_ship.rb', line 70

def notify_handshake_done(proto)
  @protocol_handler.verify_protocol(protocol)
  NextSocketAction::CONTINUE
end

#notify_protocol_error(e) ⇒ Object



123
124
125
126
127
# File 'lib/baykit/bayserver/common/warp_ship.rb', line 123

def notify_protocol_error(e)
  BayLog.error_e(e)
  notify_error_to_owner_tour(HttpStatus::SERVICE_UNAVAILABLE, e.message)
  true
end

#notify_read(buf) ⇒ Object



85
86
87
# File 'lib/baykit/bayserver/common/warp_ship.rb', line 85

def notify_read(buf)
  return @protocol_handler.bytes_received(buf)
end

#notify_service_unavailable(msg) ⇒ Object



189
190
191
# File 'lib/baykit/bayserver/common/warp_ship.rb', line 189

def notify_service_unavailable(msg)
  notify_error_to_owner_tour(HttpStatus::SERVICE_UNAVAILABLE, msg)
end

#packet_unpackerObject



210
211
212
# File 'lib/baykit/bayserver/common/warp_ship.rb', line 210

def packet_unpacker
  return @protocol_handler.packet_unpacker
end

#post(cmd, &listener) ⇒ Object



258
259
260
261
262
263
264
265
266
267
268
# File 'lib/baykit/bayserver/common/warp_ship.rb', line 258

def post(cmd, &listener)
  if !@connected
    @cmd_buf << [cmd, listener]
  else
    if cmd == nil
      listener.call()
    else
      @protocol_handler.post(cmd, &listener)
    end
  end
end

#resetObject

Implements Reusable



57
58
59
60
61
62
63
64
65
# File 'lib/baykit/bayserver/common/warp_ship.rb', line 57

def reset()
  super
  if !@tour_map.empty?
    BayLog.error("BUG: Some tours is active: %s", @tour_map)
  end
  @connected = false
  @tour_map = {}
  @cmd_buf = []
end

#start_warp_tour(tur) ⇒ Object



152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
# File 'lib/baykit/bayserver/common/warp_ship.rb', line 152

def start_warp_tour(tur)
  w_hnd = warp_handler()
  warp_id = w_hnd.next_warp_id()
  wdat = w_hnd.new_warp_data(warp_id)
  BayLog.debug("%s new warp tour related to %s", wdat, tur)
  tur.req.set_content_handler(wdat)

  BayLog.debug("%s start: warpId=%d", wdat, warp_id);
  if @tour_map.key?(warp_id)
    raise Sink.new("warpId exists")
  end

  @tour_map[warp_id] = [tur.id(), tur]
  w_hnd.send_res_headers(tur)

  if @connected
    BayLog.debug("%s is already connected. Start warp tour:%s", wdat, tur);
    wdat.start
  end
end

#to_sObject



41
42
43
44
45
46
47
# File 'lib/baykit/bayserver/common/warp_ship.rb', line 41

def to_s()
  protocol = ""
  if @protocol_handler != nil
    protocol = "[#{@protocol_handler.protocol}]"
  end
  return "agt##{agent_id} wsip##{@ship_id}/#{@object_id}[#{protocol}]"
end

#warp_handlerObject

Other methods



148
149
150
# File 'lib/baykit/bayserver/common/warp_ship.rb', line 148

def warp_handler
  return @protocol_handler.command_handler
end