Class: BackgrounDRb::ClusterConnection

Inherits:
Object
  • Object
show all
Includes:
ClientHelper
Defined in:
lib/backgroundrb/bdrb_cluster_connection.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from ClientHelper

#gen_worker_key

Constructor Details

#initializeClusterConnection

initialize cluster connection



9
10
11
12
13
14
15
16
17
18
19
20
# File 'lib/backgroundrb/bdrb_cluster_connection.rb', line 9

def initialize
  @bdrb_servers = []
  @backend_connections = []
  @disconnected_connections = {}

  @last_polled_time = Time.now
  @request_count = 0

  initialize_memcache if BackgrounDRb::BDRB_CONFIG[:backgroundrb][:result_storage] == 'memcache'
  establish_connections
  @round_robin = (0...@backend_connections.length).to_a
end

Instance Attribute Details

#backend_connectionsObject

Returns the value of attribute backend_connections.



5
6
7
# File 'lib/backgroundrb/bdrb_cluster_connection.rb', line 5

def backend_connections
  @backend_connections
end

#bdrb_serversObject

Returns the value of attribute bdrb_servers.



5
6
7
# File 'lib/backgroundrb/bdrb_cluster_connection.rb', line 5

def bdrb_servers
  @bdrb_servers
end

#cacheObject

Returns the value of attribute cache.



5
6
7
# File 'lib/backgroundrb/bdrb_cluster_connection.rb', line 5

def cache
  @cache
end

#configObject

Returns the value of attribute config.



5
6
7
# File 'lib/backgroundrb/bdrb_cluster_connection.rb', line 5

def config
  @config
end

#disconnected_connectionsObject

Returns the value of attribute disconnected_connections.



6
7
8
# File 'lib/backgroundrb/bdrb_cluster_connection.rb', line 6

def disconnected_connections
  @disconnected_connections
end

Instance Method Details

#all_worker_infoObject

Send worker information of all currently running workers from all configured bdrb servers



119
120
121
122
123
124
125
126
# File 'lib/backgroundrb/bdrb_cluster_connection.rb', line 119

def all_worker_info
  update_stats
  info_data = {}
  @backend_connections.each do |t_connection|
    info_data[t_connection.server_info] = t_connection.all_worker_info rescue nil
  end
  return info_data
end

#choose_serverObject

choose a server in round robin manner.



145
146
147
148
149
150
151
152
153
154
# File 'lib/backgroundrb/bdrb_cluster_connection.rb', line 145

def choose_server
  if @round_robin.empty?
    @round_robin = (0...@backend_connections.length).to_a
  end
  if @round_robin.empty? && @backend_connections.empty?
    discover_server_periodically
    raise NoServerAvailable.new("No BackgrounDRb server is found running") if @round_robin.empty? && @backend_connections.empty?
  end
  @backend_connections[@round_robin.shift]
end

#discover_server_periodicallyObject

every 10 request or 10 seconds it will try to reconnect to bdrb servers which were down



56
57
58
59
60
61
62
63
64
65
66
67
# File 'lib/backgroundrb/bdrb_cluster_connection.rb', line 56

def discover_server_periodically
  @disconnected_connections.each do |key,connection|
    connection.establish_connection
    if connection.connection_status
      @backend_connections << connection
      connection.close_connection
      @disconnected_connections[key] = nil
    end
  end
  @disconnected_connections.delete_if { |key,value| value.nil? }
  @round_robin = (0...@backend_connections.length).to_a
end

#establish_connectionsObject

initialize all backend server connections



38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
# File 'lib/backgroundrb/bdrb_cluster_connection.rb', line 38

def establish_connections
  klass = Struct.new(:ip,:port)
  if t_servers = BackgrounDRb::BDRB_CONFIG[:client]
    connections = t_servers.split(',')
    connections.each do |conn_string|
      ip = conn_string.split(':')[0]
      port = conn_string.split(':')[1].to_i
      @bdrb_servers << klass.new(ip,port)
    end
  end
  @bdrb_servers << klass.new(BackgrounDRb::BDRB_CONFIG[:backgroundrb][:ip],BackgrounDRb::BDRB_CONFIG[:backgroundrb][:port].to_i)
  @bdrb_servers.each_with_index do |connection_info,index|
    next if @backend_connections.detect { |x| x.server_info == "#{connection_info.ip}:#{connection_info.port}" }
    @backend_connections << Connection.new(connection_info.ip,connection_info.port,self)
  end
end

#find_connection(host_info) ⇒ Object

Fina a connection by host name and port

Raises:



84
85
86
87
88
# File 'lib/backgroundrb/bdrb_cluster_connection.rb', line 84

def find_connection host_info
  conn = @backend_connections.detect { |x| x.server_info == host_info }
  raise NoServerAvailable.new("BackgrounDRb server is not found running on #{host_info}") unless conn
  return conn
end

#find_localObject

find the local configured connection



91
92
93
# File 'lib/backgroundrb/bdrb_cluster_connection.rb', line 91

def find_local
  find_connection("#{BackgrounDRb::BDRB_CONFIG[:backgroundrb][:ip]}:#{BackgrounDRb::BDRB_CONFIG[:backgroundrb][:port]}")
end

#find_next_except_these(connections) ⇒ Object

Find live connections except those mentioned in array, because they are already dead.

Raises:



71
72
73
74
75
76
77
78
79
80
81
# File 'lib/backgroundrb/bdrb_cluster_connection.rb', line 71

def find_next_except_these connections
  invalid_connections = @backend_connections.select { |x| connections.include?(x.server_info) }
  @backend_connections.delete_if { |x| connections.include?(x.server_info) }
  @round_robin = (0...@backend_connections.length).to_a
  invalid_connections.each do |x|
    @disconnected_connections[x.server_info] = x
  end
  chosen = @backend_connections.detect { |x| !(connections.include?(x.server_info)) }
  raise NoServerAvailable.new("No BackgrounDRb server is found running") unless chosen
  chosen
end

#initialize_memcacheObject

initialize memache if client is storing results in memcache



23
24
25
26
27
28
29
30
31
32
33
34
35
# File 'lib/backgroundrb/bdrb_cluster_connection.rb', line 23

def initialize_memcache
  require 'memcache'
  memcache_options = {
    :c_threshold => 10_000,
    :compression => true,
    :debug => false,
    :namespace => 'backgroundrb_result_hash',
    :readonly => false,
    :urlencode => false
  }
  @cache = MemCache.new(memcache_options)
  @cache.servers = BackgrounDRb::BDRB_CONFIG[:memcache].split(',')
end

#new_worker(options = {}) ⇒ Object

one of the backend connections are chosen and worker is started on it

Raises:



129
130
131
132
133
134
135
136
137
138
139
140
141
142
# File 'lib/backgroundrb/bdrb_cluster_connection.rb', line 129

def new_worker(options = {})
  update_stats
  succeeded = false
  result = nil

  @backend_connections.each do |connection|
    begin
      result = connection.new_worker(options)
      succeeded = true
    rescue BdrbConnError; end
  end
  raise NoServerAvailable.new("No BackgrounDRb server is found running") unless succeeded
  return options[:worker_key]
end

#time_to_discover?Boolean

Check if, we should try to discover new bdrb servers

Returns:

  • (Boolean)


108
109
110
111
112
113
114
115
# File 'lib/backgroundrb/bdrb_cluster_connection.rb', line 108

def time_to_discover?
  if((@request_count%10 == 0) or (Time.now > (@last_polled_time + 10.seconds)))
    @last_polled_time = Time.now
    return true
  else
    return false
  end
end

#update_statsObject

Update the stats and discover new nodes if they came up.



102
103
104
105
# File 'lib/backgroundrb/bdrb_cluster_connection.rb', line 102

def update_stats
  @request_count += 1
  discover_server_periodically if(time_to_discover? && !@disconnected_connections.empty?)
end

#worker(worker_name, worker_key = nil) ⇒ Object

return the worker proxy



96
97
98
99
# File 'lib/backgroundrb/bdrb_cluster_connection.rb', line 96

def worker(worker_name,worker_key = nil)
  update_stats
  RailsWorkerProxy.new(worker_name,worker_key,self)
end