Class: ZeevexCluster::Strategy::Cas
- Inherits:
-
Base
- Object
- Base
- ZeevexCluster::Strategy::Cas
show all
- Defined in:
- lib/zeevex_cluster/strategy/cas.rb
Defined Under Namespace
Classes: StopException
Constant Summary
collapse
- SUSPECT_MISSED_UPDATE_COUNT =
3
- INAUGURATION_UPDATE_DELAY =
2
Instance Attribute Summary collapse
Instance Method Summary
collapse
Methods inherited from Base
#am_i_master?, #has_master?, #member?, #online?, #started?, #state, #stopped?
#logger
Constructor Details
#initialize(options = {}) ⇒ Cas
Returns a new instance of Cas.
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
|
# File 'lib/zeevex_cluster/strategy/cas.rb', line 12
def initialize(options = {})
super
@stale_time = options.fetch(:stale_time, 40)
@update_period = options.fetch(:update_period, 10)
unless (@server = options[:coordinator])
coordinator_type = options[:coordinator_type] || 'memcached'
@server = ZeevexCluster::Coordinator.create(coordinator_type,
{:server => options[:server],
:port => options[:port],
:client => options[:client],
:expiration => @stale_time * 4}.merge(options[:coordinator_options] || {}))
end
unless @server.is_a?(ZeevexCluster::Synchronized)
@server = ZeevexCluster.Synchronized(@server)
end
end
|
Instance Attribute Details
#cluster_name ⇒ Object
Returns the value of attribute cluster_name.
7
8
9
|
# File 'lib/zeevex_cluster/strategy/cas.rb', line 7
def cluster_name
@cluster_name
end
|
#nodename ⇒ Object
Returns the value of attribute nodename.
7
8
9
|
# File 'lib/zeevex_cluster/strategy/cas.rb', line 7
def nodename
@nodename
end
|
#server ⇒ Object
Returns the value of attribute server.
7
8
9
|
# File 'lib/zeevex_cluster/strategy/cas.rb', line 7
def server
@server
end
|
#stale_time ⇒ Object
Returns the value of attribute stale_time.
7
8
9
|
# File 'lib/zeevex_cluster/strategy/cas.rb', line 7
def stale_time
@stale_time
end
|
#update_period ⇒ Object
Returns the value of attribute update_period.
7
8
9
|
# File 'lib/zeevex_cluster/strategy/cas.rb', line 7
def update_period
@update_period
end
|
Instance Method Details
#can_view? ⇒ Boolean
122
123
124
|
# File 'lib/zeevex_cluster/strategy/cas.rb', line 122
def can_view?
true
end
|
#do_i_hold_lock? ⇒ Boolean
30
31
32
|
# File 'lib/zeevex_cluster/strategy/cas.rb', line 30
def do_i_hold_lock?
@my_cluster_status == :master || @my_cluster_status == :master_elect
end
|
#master_node ⇒ Object
34
35
36
|
# File 'lib/zeevex_cluster/strategy/cas.rb', line 34
def master_node
@current_master
end
|
#master_nodename ⇒ Object
38
39
40
|
# File 'lib/zeevex_cluster/strategy/cas.rb', line 38
def master_nodename
@current_master && @current_master[:nodename]
end
|
#members ⇒ Object
112
113
114
115
116
117
118
119
120
|
# File 'lib/zeevex_cluster/strategy/cas.rb', line 112
def members
stale_point = time_now - @stale_time
list = server.get(key('members')) || make_member_list
members = []
list[:members].values.each do |v|
members << v[:nodename] unless time_of(v[:timestamp]) < stale_point
end
members
end
|
#observe ⇒ Object
grab a snapshot of the cluster
133
134
135
136
137
|
# File 'lib/zeevex_cluster/strategy/cas.rb', line 133
def observe
token = server.get(key)
@current_master = qualifies_for_master?(token) ? token : nil
{:master => @current_master, :members => members}
end
|
#observing? ⇒ Boolean
126
127
128
|
# File 'lib/zeevex_cluster/strategy/cas.rb', line 126
def observing?
true
end
|
#resign(delay = nil) ⇒ Object
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
|
# File 'lib/zeevex_cluster/strategy/cas.rb', line 78
def resign(delay = nil)
if delay == 0
@resign_until = nil
campaign
else
@resign_until = time_now + (delay || [@update_period*6, @stale_time].min)
current = nil
server.cas(key) do |val|
current = val
if is_me?(val)
my_token.merge(:timestamp => time_now - 2*@stale_time)
else
raise ZeevexCluster::Coordinator::DontChange
end
end
failed_lock(my_token, current)
end
rescue ZeevexCluster::Coordinator::ConnectionError
failed_lock(my_token, nil)
end
|
#start ⇒ Object
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
|
# File 'lib/zeevex_cluster/strategy/cas.rb', line 44
def start
raise "Already started" if @thread || @state == :started
@start_time = time_now
@state = :started
@locked_at = nil
@thread = Thread.new do
begin
change_my_status :member
spin
rescue
logger.warn "rescued from spin: #{$!.inspect}\n#{$!.backtrace.join("\n")}"
ensure
logger.debug "spin over"
@state = :stopped
end
end
end
|
#steal_election! ⇒ Object
101
102
103
104
105
106
107
108
109
110
|
# File 'lib/zeevex_cluster/strategy/cas.rb', line 101
def steal_election!
logger.warn "Stealing election"
@resign_until = nil
me = my_token
server.set(key, me)
got_lock(me)
true
rescue ZeevexCluster::Coordinator::ConnectionError
false
end
|
#stop ⇒ Object
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
|
# File 'lib/zeevex_cluster/strategy/cas.rb', line 62
def stop
case @state
when :stop_requested
when :stopped
when :started
@state = :stop_requested
@thread.raise(StopException.new 'stop')
else
raise "Bad state: #{@state}"
end
@thread.join
@thread = nil
change_my_status :nonmember
reset_state_vars
end
|