Class: ConsistentCluster::SyncClient
- Inherits:
-
Object
- Object
- ConsistentCluster::SyncClient
- Defined in:
- lib/consistent-cluster/sync-client.rb
Instance Method Summary collapse
-
#initialize(options) ⇒ SyncClient
constructor
A new instance of SyncClient.
- #invoke_sync ⇒ Object
- #last_sync_at ⇒ Object
- #local_version ⇒ Object
- #remote_version ⇒ Object
- #shard(key = nil) ⇒ Object
- #start_check_timer_thread(check_timer) ⇒ Object
- #zookeeper_path ⇒ Object
Constructor Details
#initialize(options) ⇒ SyncClient
Returns a new instance of SyncClient.
12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 |
# File 'lib/consistent-cluster/sync-client.rb', line 12 def initialize() @syncingMutex = Mutex.new @data = {} @cluster = {} @node_register = {} replicas = [:consistent_hashing_replicas] || 3 @ring = ConsistentHashing::Ring.new([],replicas) @create_proc = [:create_proc] @destroy_proc = [:destroy_proc] @after_sync_proc = [:after_sync_proc] @log_proc = [:log_proc] @logger = if [:log_file] Logger.new([:log_file]) elsif STDERR || STDOUT Logger.new(STDERR || STDOUT) else Logger.new([:nil]) end @last_sync_at = 0 @path = [:zookeeper_path] @zk = ZK.new([:zookeeper_service],reconnect: true) @zk.register(@path) do |event| sync_services end sync_services #on_state_change #on_connecting #on_expired_session @zk.on_connected do reconnect_callback end check_timer = [:check_timer] check_timer ||= 60 #default 1 minutes gap start_check_timer_thread(check_timer) @shard_num = 0 end |
Instance Method Details
#invoke_sync ⇒ Object
94 95 96 |
# File 'lib/consistent-cluster/sync-client.rb', line 94 def invoke_sync sync_services end |
#last_sync_at ⇒ Object
117 118 119 |
# File 'lib/consistent-cluster/sync-client.rb', line 117 def last_sync_at @last_sync_at end |
#local_version ⇒ Object
98 99 100 |
# File 'lib/consistent-cluster/sync-client.rb', line 98 def local_version @data end |
#remote_version ⇒ Object
102 103 104 105 106 107 108 109 110 111 |
# File 'lib/consistent-cluster/sync-client.rb', line 102 def remote_version app_names = get_app_names(watch: false) data = {} app_names.each do |app_name| app_content = get_app_content(app_name, watch: false) data[app_name] = app_content end data end |
#shard(key = nil) ⇒ Object
80 81 82 83 84 85 86 87 88 89 90 91 92 |
# File 'lib/consistent-cluster/sync-client.rb', line 80 def shard(key=nil) cluster_sum = @cluster.length raise "no service available at #{@path}" if cluster_sum < 1 if key point = @ring.point_for(key) server = @cluster[point.node] else @shard_num += 1 @shard_num = @shard_num%cluster_sum server = @cluster.values[@shard_num] end server end |
#start_check_timer_thread(check_timer) ⇒ Object
63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 |
# File 'lib/consistent-cluster/sync-client.rb', line 63 def start_check_timer_thread(check_timer) return if check_timer <= 0 Thread.new { while true sleep check_timer begin if !@syncingMutex.locked? && local_version != remote_version sync_services end rescue Exception => boom #eat error here. loop should continue. @logger.error "timerSyncError, raise #{boom.class} - #{boom.message}" end end } end |
#zookeeper_path ⇒ Object
113 114 115 |
# File 'lib/consistent-cluster/sync-client.rb', line 113 def zookeeper_path @path end |