Class: Roma::Messaging::ConPool
- Inherits:
-
Object
- Object
- Roma::Messaging::ConPool
- Includes:
- Singleton
- Defined in:
- lib/roma/messaging/con_pool.rb
Instance Attribute Summary collapse
-
#expire_time ⇒ Object
Returns the value of attribute expire_time.
-
#maxlength ⇒ Object
Returns the value of attribute maxlength.
Instance Method Summary collapse
- #check_connection(ap) ⇒ Object
- #close_all ⇒ Object
- #close_at(ap) ⇒ Object
- #close_same_host(ap) ⇒ Object
- #create_connection(ap) ⇒ Object
- #delete_connection(ap) ⇒ Object
- #get_connection(ap) ⇒ Object
-
#initialize(maxlength = 10, expire_time = 30) ⇒ ConPool
constructor
A new instance of ConPool.
- #return_connection(ap, con) ⇒ Object
Constructor Details
#initialize(maxlength = 10, expire_time = 30) ⇒ ConPool
Returns a new instance of ConPool.
15 16 17 18 19 20 |
# File 'lib/roma/messaging/con_pool.rb', line 15 def initialize(maxlength = 10, expire_time = 30) @pool = {} @maxlength = maxlength @expire_time = expire_time @lock = Mutex.new end |
Instance Attribute Details
#expire_time ⇒ Object
Returns the value of attribute expire_time.
13 14 15 |
# File 'lib/roma/messaging/con_pool.rb', line 13 def expire_time @expire_time end |
#maxlength ⇒ Object
Returns the value of attribute maxlength.
12 13 14 |
# File 'lib/roma/messaging/con_pool.rb', line 12 def maxlength @maxlength end |
Instance Method Details
#check_connection(ap) ⇒ Object
36 37 38 39 40 41 42 43 44 45 46 47 48 |
# File 'lib/roma/messaging/con_pool.rb', line 36 def check_connection(ap) unless @pool.key?(ap) host, port = ap.split(/[:_]/) addr = DNSCache.instance.resolve_name(host) sock = TCPSocket.open(addr, port) sock.close end true rescue Errno::ECONNREFUSED => e false rescue => e Logging::RLogger.instance.error("#{__FILE__}:#{__LINE__}:#{e}") end |
#close_all ⇒ Object
80 81 82 |
# File 'lib/roma/messaging/con_pool.rb', line 80 def close_all @pool.each_key{|ap| close_at(ap) } end |
#close_at(ap) ⇒ Object
91 92 93 94 95 96 97 98 99 100 101 102 103 |
# File 'lib/roma/messaging/con_pool.rb', line 91 def close_at(ap) return unless @pool.key?(ap) @lock.synchronize { while(@pool[ap].length > 0) begin @pool[ap].shift[0].close rescue =>e Roma::Logging::RLogger.instance.error("#{__FILE__}:#{__LINE__}:#{e}") end end @pool.delete(ap) } end |
#close_same_host(ap) ⇒ Object
84 85 86 87 88 89 |
# File 'lib/roma/messaging/con_pool.rb', line 84 def close_same_host(ap) host,port = ap.split(/[:_]/) @pool.each_key{|eap| close_at(eap) if eap.split(/[:_]/)[0] == host } end |
#create_connection(ap) ⇒ Object
70 71 72 73 74 |
# File 'lib/roma/messaging/con_pool.rb', line 70 def create_connection(ap) host, port = ap.split(/[:_]/) addr = DNSCache.instance.resolve_name(host) TCPSocket.new(addr, port) end |
#delete_connection(ap) ⇒ Object
76 77 78 |
# File 'lib/roma/messaging/con_pool.rb', line 76 def delete_connection(ap) @pool.delete(ap) end |
#get_connection(ap) ⇒ Object
22 23 24 25 26 27 28 29 30 31 32 33 34 |
# File 'lib/roma/messaging/con_pool.rb', line 22 def get_connection(ap) ret,last = @pool[ap].shift if @pool.key?(ap) && @pool[ap].length > 0 if ret && @expire_time != 0 && last < Time.now - @expire_time ret.close ret = nil Logging::RLogger.instance.info("connection expired at #{ap},remains #{@pool[ap].length}") end return create_connection(ap) unless ret ret rescue => e Logging::RLogger.instance.error("#{__FILE__}:#{__LINE__}:#{e}") nil end |
#return_connection(ap, con) ⇒ Object
50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 |
# File 'lib/roma/messaging/con_pool.rb', line 50 def return_connection(ap, con) if select([con],nil,nil,0.0001) con.gets con.close return end if @pool.key?(ap) && @pool[ap].length > 0 if @pool[ap].length > @maxlength con.close else @pool[ap] << [con, Time.now] end else @pool[ap] = [[con, Time.now]] end rescue => e Logging::RLogger.instance.error("#{__FILE__}:#{__LINE__}:#{e}") end |