Class: Roma::Messaging::ConPool

Inherits:
Object
  • Object
show all
Includes:
Singleton
Defined in:
lib/roma/messaging/con_pool.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(maxlength = 10, expire_time = 30) ⇒ ConPool

Returns a new instance of ConPool.



15
16
17
18
19
20
# File 'lib/roma/messaging/con_pool.rb', line 15

def initialize(maxlength = 10, expire_time = 30)
  @pool = {}
  @maxlength = maxlength
  @expire_time = expire_time
  @lock = Mutex.new
end

Instance Attribute Details

#expire_timeObject

Returns the value of attribute expire_time.



13
14
15
# File 'lib/roma/messaging/con_pool.rb', line 13

def expire_time
  @expire_time
end

#maxlengthObject

Returns the value of attribute maxlength.



12
13
14
# File 'lib/roma/messaging/con_pool.rb', line 12

def maxlength
  @maxlength
end

Instance Method Details

#check_connection(ap) ⇒ Object



36
37
38
39
40
41
42
43
44
45
46
47
48
# File 'lib/roma/messaging/con_pool.rb', line 36

def check_connection(ap)
  unless @pool.key?(ap)
    host, port = ap.split(/[:_]/)
    addr = DNSCache.instance.resolve_name(host)
    sock = TCPSocket.open(addr, port)
    sock.close
  end
  true
rescue Errno::ECONNREFUSED => e
  false
rescue => e
  Logging::RLogger.instance.error("#{__FILE__}:#{__LINE__}:#{e}")
end

#close_allObject



80
81
82
# File 'lib/roma/messaging/con_pool.rb', line 80

def close_all
  @pool.each_key{|ap| close_at(ap) }
end

#close_at(ap) ⇒ Object



91
92
93
94
95
96
97
98
99
100
101
102
103
# File 'lib/roma/messaging/con_pool.rb', line 91

def close_at(ap)
  return unless @pool.key?(ap)
  @lock.synchronize {
    while(@pool[ap].length > 0)
      begin
        @pool[ap].shift[0].close
      rescue =>e
        Roma::Logging::RLogger.instance.error("#{__FILE__}:#{__LINE__}:#{e}")
      end
    end
    @pool.delete(ap)
  }
end

#close_same_host(ap) ⇒ Object



84
85
86
87
88
89
# File 'lib/roma/messaging/con_pool.rb', line 84

def close_same_host(ap)
  host,port = ap.split(/[:_]/)
  @pool.each_key{|eap|
    close_at(eap) if eap.split(/[:_]/)[0] == host
  }
end

#create_connection(ap) ⇒ Object



70
71
72
73
74
# File 'lib/roma/messaging/con_pool.rb', line 70

def create_connection(ap)
  host, port = ap.split(/[:_]/)
  addr = DNSCache.instance.resolve_name(host)
  TCPSocket.new(addr, port)
end

#delete_connection(ap) ⇒ Object



76
77
78
# File 'lib/roma/messaging/con_pool.rb', line 76

def delete_connection(ap)
  @pool.delete(ap)
end

#get_connection(ap) ⇒ Object



22
23
24
25
26
27
28
29
30
31
32
33
34
# File 'lib/roma/messaging/con_pool.rb', line 22

def get_connection(ap)
  ret,last = @pool[ap].shift if @pool.key?(ap) && @pool[ap].length > 0
  if ret && @expire_time != 0 && last < Time.now - @expire_time
    ret.close
    ret = nil
    Logging::RLogger.instance.info("connection expired at #{ap},remains #{@pool[ap].length}")
  end
  return create_connection(ap) unless ret
  ret
rescue => e
  Logging::RLogger.instance.error("#{__FILE__}:#{__LINE__}:#{e}")
  nil
end

#return_connection(ap, con) ⇒ Object



50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
# File 'lib/roma/messaging/con_pool.rb', line 50

def return_connection(ap, con)
  if select([con],nil,nil,0.0001)
    con.gets
    con.close
    return
  end

  if @pool.key?(ap) && @pool[ap].length > 0
    if @pool[ap].length > @maxlength
      con.close
    else
      @pool[ap] << [con, Time.now]
    end
  else
    @pool[ap] = [[con, Time.now]]
  end
rescue => e
  Logging::RLogger.instance.error("#{__FILE__}:#{__LINE__}:#{e}")
end