Module: Packet::Core::CommonMethods

Includes:
NbioHelper
Defined in:
lib/packet/packet_core.rb

Overview

end of module#ClassMethods

Instance Method Summary collapse

Methods included from NbioHelper

#dump_object, #gen_worker_key, #object_dump, #packet_classify, #read_data, #write_and_schedule, #write_once

Instance Method Details

#accept_connection(sock_opts) ⇒ Object



69
70
71
72
73
74
75
76
77
78
79
# File 'lib/packet/packet_core.rb', line 69

def accept_connection(sock_opts)
  sock_io = sock_opts[:socket]
  begin
    client_socket,client_sockaddr = sock_io.accept_nonblock
    client_socket.setsockopt(Socket::IPPROTO_TCP,Socket::TCP_NODELAY,1)
  rescue Errno::EAGAIN, Errno::ECONNABORTED, Errno::EPROTO, Errno::EINTR
    return
  end
  read_ios << client_socket
  decorate_handler(client_socket,true,client_sockaddr,sock_opts[:module],&sock_opts[:block])
end

#add_periodic_timer(interval, &block) ⇒ Object



237
238
239
240
241
# File 'lib/packet/packet_core.rb', line 237

def add_periodic_timer(interval,&block)
  t_timer = PeriodicEvent.new(interval,&block)
  @timer_hash[t_timer.timer_signature] = t_timer
  return t_timer
end

#add_timer(elapsed_time, &block) ⇒ Object



243
244
245
246
247
248
# File 'lib/packet/packet_core.rb', line 243

def add_timer(elapsed_time,&block)
  t_timer = Event.new(elapsed_time,&block)
  # @timer_hash.store(timer)
  @timer_hash[t_timer.timer_signature] = t_timer
  return t_timer
end

#binding_strObject



254
255
256
257
# File 'lib/packet/packet_core.rb', line 254

def binding_str
  @binding += 1
  "BIND_#{@binding}"
end

#cancel_timer(t_timer) ⇒ Object



250
251
252
# File 'lib/packet/packet_core.rb', line 250

def cancel_timer(t_timer)
  @timer_hash.delete(t_timer.timer_signature)
end

#cancel_write(t_sock) ⇒ Object



165
166
167
168
169
170
171
172
173
174
175
# File 'lib/packet/packet_core.rb', line 165

def cancel_write(t_sock)
  if !t_sock.closed?
    fileno = t_sock.fileno
    if UNIXSocket === t_sock
      internal_scheduled_write.delete(fileno)
    else
      write_scheduled.delete(fileno)
    end
  end
  write_ios.delete(t_sock)
end

#check_for_timer_eventsObject



288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
# File 'lib/packet/packet_core.rb', line 288

def check_for_timer_events
  #         @timer_hash.delete_if do |key,timer|
  #           if timer.cancel_flag
  #             true
  #           elsif timer.run_now?
  #             timer.run
  #             (timer.respond_to?(:interval)) ? false : true
  #           else
  #             false
  #           end
  #         end

  ready_timers = @timer_hash.collect { |key,timer| timer if timer.run_now? }.compact
  ready_timers.each { |timer| timer.run }
  @timer_hash.delete_if { |key,timer|
    timer.cancel_flag || (!timer.respond_to?(:interval) && ready_timers.include?(timer)) || false
  }
end

#close_connection(sock = nil) ⇒ Object

close the connection with internal specified socket



308
309
310
311
312
313
314
# File 'lib/packet/packet_core.rb', line 308

def close_connection(sock = nil)
  begin
    read_ios.delete(sock.fileno)
    write_ios.delete(sock.fileno)
    sock.close
  rescue; end
end

#complete_connection(t_sock, sock_opts) ⇒ Object



81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
# File 'lib/packet/packet_core.rb', line 81

def complete_connection(t_sock,sock_opts)
  actually_connected = true
  begin
    t_sock.connect_nonblock(sock_opts[:sock_addr])
  rescue Errno::EISCONN
    puts "Socket already connected"
  rescue Errno::ECONNREFUSED
    actually_connected = false
  end
  connection_completion_awaited.delete(t_sock.fileno)
  read_ios << t_sock if actually_connected
  write_ios.delete(t_sock)
  decorate_handler(t_sock,actually_connected,sock_opts[:sock_addr],\
                     sock_opts[:module],&sock_opts[:block])

end

#connect(ip, port, t_module, &block) ⇒ Object

method



39
40
41
42
43
44
45
46
47
48
49
50
51
52
# File 'lib/packet/packet_core.rb', line 39

def connect(ip,port,t_module,&block)
  t_socket = Socket.new(Socket::AF_INET,Socket::SOCK_STREAM,0)
  t_sock_addr = Socket.sockaddr_in(port,ip)
  t_socket.setsockopt(Socket::IPPROTO_TCP,Socket::TCP_NODELAY,1)

  connection_completion_awaited[t_socket.fileno] =
    { :sock_addr => t_sock_addr, :module => t_module,:block => block }
  begin
    t_socket.connect_nonblock(t_sock_addr)
    immediate_complete(t_socket,t_sock_addr,t_module,&block)
  rescue Errno::EINPROGRESS
    write_ios << t_socket
  end
end

#decorate_handler(t_socket, actually_connected, sock_addr, t_module, &block) ⇒ Object



327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
# File 'lib/packet/packet_core.rb', line 327

def decorate_handler(t_socket,actually_connected,sock_addr,t_module,&block)
  handler_instance = initialize_handler(t_module)
  after_connection_callbacks = connection_callbacks ? connection_callbacks[:after_connection] : nil
  after_connection_callbacks && after_connection_callbacks.each { |t_callback| self.send(t_callback,handler_instance,t_socket)}
  handler_instance.worker = self
  handler_instance.connection = t_socket
  handler_instance.reactor = self
  handler_instance.invoke_init unless handler_instance.initialized
  unless actually_connected
    handler_instance.unbind
    remove_connection(t_socket)
    return
  end
  handler_instance.signature = binding_str
  # FIXME: An Struct is more fashionable, but will have some performance hit, can use a simple hash here
  # klass = Struct.new(:socket,:instance,:signature,:sock_addr)
  connection_data = { :socket => t_socket,:instance => handler_instance,:signature => binding_str,:sock_addr => sock_addr }
  connections[t_socket.fileno] = connection_data
  # connections[t_socket.fileno] = klass.new(t_socket,handler_instance,handler_instance.signature,sock_addr)

  block.call(handler_instance) if block
  handler_instance.connection_completed #if handler_instance.respond_to?(:connection_completed)
  handler_instance
end

#handle_external_messages(t_sock) ⇒ Object



217
218
219
220
221
222
223
224
# File 'lib/packet/packet_core.rb', line 217

def handle_external_messages(t_sock)
  sock_fd = t_sock.fileno
  if sock_opts = listen_sockets[sock_fd]
    accept_connection(sock_opts)
  else
    read_external_socket(t_sock)
  end
end

#handle_internal_messages(t_sock) ⇒ Object



213
214
215
# File 'lib/packet/packet_core.rb', line 213

def handle_internal_messages(t_sock)
  raise "Method should be implemented by concerned classes"
end

#handle_read_event(p_ready_fds) ⇒ Object



190
191
192
193
194
195
196
197
198
199
# File 'lib/packet/packet_core.rb', line 190

def handle_read_event(p_ready_fds)
  ready_fds = p_ready_fds.flatten.compact
  ready_fds.each do |t_sock|
    if(t_sock.is_a?(UNIXSocket))
      handle_internal_messages(t_sock)
    else
      handle_external_messages(t_sock)
    end
  end
end

#handle_write_event(p_ready_fds) ⇒ Object



177
178
179
180
181
182
183
184
185
186
187
188
# File 'lib/packet/packet_core.rb', line 177

def handle_write_event(p_ready_fds)
  p_ready_fds.each do |sock_fd|
    fileno = sock_fd.fileno
    if UNIXSocket === sock_fd && (internal_instance = internal_scheduled_write[fileno])
      internal_instance.write_and_schedule(sock_fd)
    elsif extern_opts = connection_completion_awaited[fileno]
      complete_connection(sock_fd,extern_opts)
    elsif handler_instance = write_scheduled[fileno]
      handler_instance.write_and_schedule(sock_fd)
    end
  end
end

#immediate_complete(t_socket, sock_addr, t_module, &block) ⇒ Object



62
63
64
65
66
67
# File 'lib/packet/packet_core.rb', line 62

def immediate_complete(t_socket,sock_addr,t_module,&block)
  read_ios << t_socket
  write_ios.delete(t_socket)
  decorate_handler(t_socket,true,sock_addr,t_module,&block)
  connection_completion_awaited.delete(t_socket.fileno)
end

#initializeObject



259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
# File 'lib/packet/packet_core.rb', line 259

def initialize
  @read_ios ||= []
  @write_ios ||= []
  @connection_completion_awaited ||= {}
  @write_scheduled ||= {}
  @internal_scheduled_write ||= {}
  # internal outbound data
  @outbound_data = []
  @connections ||= {}
  @listen_sockets ||= {}
  @binding = 0
  @on_next_tick = nil

  # @timer_hash = Packet::TimerStore
  @timer_hash ||= {}
  # @thread_pool = ThreadPool.new(thread_pool_size || 20)
  @windows_flag = windows?
  @reactor = self
end

#initialize_handler(p_module) ⇒ Object



316
317
318
319
320
321
322
323
324
325
# File 'lib/packet/packet_core.rb', line 316

def initialize_handler(p_module)
  return p_module if(!p_module.is_a?(Class) and !p_module.is_a?(Module))
  handler =
    if(p_module and p_module.is_a?(Class))
      p_module and p_module.send(:include,Connection)
    else
      Class.new { include Connection; include p_module; }
    end
  return handler.new
end

#next_turn(&block) ⇒ Object



112
113
114
# File 'lib/packet/packet_core.rb', line 112

def next_turn &block
  @on_next_tick = block
end

#read_external_socket(t_sock) ⇒ Object



226
227
228
229
230
231
232
233
234
235
# File 'lib/packet/packet_core.rb', line 226

def read_external_socket(t_sock)
  handler_instance = connections[t_sock.fileno][:instance]
  begin
    t_data = read_data(t_sock)
    handler_instance.receive_data(t_data)
  rescue DisconnectError => sock_error
    handler_instance.receive_data(sock_error.data) unless (sock_error.data).empty?
    handler_instance.close_connection
  end
end

#reconnect(server, port, handler) ⇒ Object



54
55
56
57
58
59
60
# File 'lib/packet/packet_core.rb', line 54

def reconnect(server,port,handler)
  raise "invalid handler" unless handler.respond_to?(:connection_completed)
  if !handler.connection.closed? && connections.keys.include?(handler.connection.fileno)
    return handler
  end
  connect(server,port,handler)
end

#remove_connection(t_sock) ⇒ Object

method removes the connection and closes the socket



99
100
101
102
103
104
105
106
107
108
109
110
# File 'lib/packet/packet_core.rb', line 99

def remove_connection(t_sock)
  read_ios.delete(t_sock)
  write_ios.delete(t_sock)
  begin
    unless t_sock.closed?
      connections.delete(t_sock.fileno)
      t_sock.close
    end
  rescue
    puts "#{$!.message}"
  end
end

#schedule_write(t_sock, internal_instance = nil) ⇒ Object



154
155
156
157
158
159
160
161
162
163
# File 'lib/packet/packet_core.rb', line 154

def schedule_write(t_sock,internal_instance = nil)
  fileno = t_sock.fileno
  if UNIXSocket === t_sock && internal_scheduled_write[fileno].nil?
    write_ios << t_sock
    internal_scheduled_write[t_sock.fileno] ||= internal_instance
  elsif write_scheduled[fileno].nil? && !(t_sock.is_a?(UNIXSocket))
    write_ios << t_sock
    write_scheduled[fileno] ||= connections[fileno][:instance]
  end
end

#shutdownObject



207
208
209
210
211
# File 'lib/packet/packet_core.rb', line 207

def shutdown
  # @thread_pool.kill_all
  # FIXME: close the open sockets
  exit
end

#start_reactorObject

method starts event loop in the process



136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
# File 'lib/packet/packet_core.rb', line 136

def start_reactor
  Signal.trap("TERM") { terminate_me }
  Signal.trap("INT") { shutdown }
  loop do
    check_for_timer_events
    @on_next_tick.call if @on_next_tick

    ready_read_fds,ready_write_fds,read_error_fds = select(read_ios,write_ios,[],0.005)

    if ready_read_fds && !ready_read_fds.empty?
      handle_read_event(ready_read_fds)
    elsif ready_write_fds && !ready_write_fds.empty?
      handle_write_event(ready_write_fds)
    end
  end

end

#start_server(ip, port, t_module, &block) ⇒ Object

method opens a socket for listening



117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
# File 'lib/packet/packet_core.rb', line 117

def start_server(ip,port,t_module,&block)
  BasicSocket.do_not_reverse_lookup = true
  # Comment TCPServer for the time being
  #t_socket = TCPServer.new(ip,port.to_i)
  #t_socket = TCPSocket.

  t_socket = Socket.new(Socket::AF_INET,Socket::SOCK_STREAM,0)
  t_socket.setsockopt(Socket::SOL_SOCKET,Socket::SO_REUSEADDR,true)
  sockaddr = Socket.sockaddr_in(port.to_i,ip)
  t_socket.bind(sockaddr)
  t_socket.listen(50)
  t_socket.setsockopt(Socket::IPPROTO_TCP,Socket::TCP_NODELAY,1)

  # t_socket.setsockopt(*@tcp_defer_accept_opts) rescue nil
  listen_sockets[t_socket.fileno] = { :socket => t_socket,:block => block,:module => t_module }
  @read_ios << t_socket
end

#terminate_meObject



201
202
203
204
205
# File 'lib/packet/packet_core.rb', line 201

def terminate_me
  # FIXME: close the open sockets
  # @thread_pool.kill_all
  exit
end

#unix?Boolean

Returns:

  • (Boolean)


284
285
286
# File 'lib/packet/packet_core.rb', line 284

def unix?
  !@windows_flag
end

#windows?Boolean

Returns:

  • (Boolean)


279
280
281
282
# File 'lib/packet/packet_core.rb', line 279

def windows?
  return true if RUBY_PLATFORM =~ /win32/i
  return false
end