Class: ZeevexCluster::Strategy::Cas

Inherits:
Base
  • Object
show all
Defined in:
lib/zeevex_cluster/strategy/cas.rb

Defined Under Namespace

Classes: StopException

Constant Summary collapse

SUSPECT_MISSED_UPDATE_COUNT =
3
INAUGURATION_UPDATE_DELAY =
2

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods inherited from Base

#am_i_master?, #has_master?, #member?, #online?, #started?, #state, #stopped?

Methods included from Util::Logging

#logger

Constructor Details

#initialize(options = {}) ⇒ Cas

Returns a new instance of Cas.



12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
# File 'lib/zeevex_cluster/strategy/cas.rb', line 12

def initialize(options = {})
  super
  @stale_time    = options.fetch(:stale_time, 40)
  @update_period = options.fetch(:update_period, 10)

  unless (@server = options[:coordinator])
    coordinator_type = options[:coordinator_type] || 'memcached'
    @server = ZeevexCluster::Coordinator.create(coordinator_type,
                                                {:server     => options[:server],
                                                 :port       => options[:port],
                                                 :client     => options[:client],
                                                 :expiration => @stale_time * 4}.merge(options[:coordinator_options] || {}))
  end
  unless @server.is_a?(ZeevexCluster::Synchronized)
    @server = ZeevexCluster.Synchronized(@server)
  end
end

Instance Attribute Details

#cluster_nameObject

Returns the value of attribute cluster_name.



7
8
9
# File 'lib/zeevex_cluster/strategy/cas.rb', line 7

def cluster_name
  @cluster_name
end

#nodenameObject

Returns the value of attribute nodename.



7
8
9
# File 'lib/zeevex_cluster/strategy/cas.rb', line 7

def nodename
  @nodename
end

#serverObject

Returns the value of attribute server.



7
8
9
# File 'lib/zeevex_cluster/strategy/cas.rb', line 7

def server
  @server
end

#stale_timeObject

Returns the value of attribute stale_time.



7
8
9
# File 'lib/zeevex_cluster/strategy/cas.rb', line 7

def stale_time
  @stale_time
end

#update_periodObject

Returns the value of attribute update_period.



7
8
9
# File 'lib/zeevex_cluster/strategy/cas.rb', line 7

def update_period
  @update_period
end

Instance Method Details

#can_view?Boolean

Returns:

  • (Boolean)


122
123
124
# File 'lib/zeevex_cluster/strategy/cas.rb', line 122

def can_view?
  true
end

#do_i_hold_lock?Boolean

Returns:

  • (Boolean)


30
31
32
# File 'lib/zeevex_cluster/strategy/cas.rb', line 30

def do_i_hold_lock?
  @my_cluster_status == :master || @my_cluster_status == :master_elect
end

#master_nodeObject



34
35
36
# File 'lib/zeevex_cluster/strategy/cas.rb', line 34

def master_node
  @current_master
end

#master_nodenameObject



38
39
40
# File 'lib/zeevex_cluster/strategy/cas.rb', line 38

def master_nodename
  @current_master && @current_master[:nodename]
end

#membersObject



112
113
114
115
116
117
118
119
120
# File 'lib/zeevex_cluster/strategy/cas.rb', line 112

def members
  stale_point = time_now - @stale_time
  list = server.get(key('members')) || make_member_list
  members = []
  list[:members].values.each do |v|
    members << v[:nodename] unless time_of(v[:timestamp]) < stale_point
  end
  members
end

#observeObject

grab a snapshot of the cluster



133
134
135
136
137
# File 'lib/zeevex_cluster/strategy/cas.rb', line 133

def observe
  token = server.get(key)
  @current_master = qualifies_for_master?(token) ? token : nil
  {:master => @current_master, :members => members}
end

#observing?Boolean

Returns:

  • (Boolean)


126
127
128
# File 'lib/zeevex_cluster/strategy/cas.rb', line 126

def observing?
  true
end

#resign(delay = nil) ⇒ Object



78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
# File 'lib/zeevex_cluster/strategy/cas.rb', line 78

def resign(delay = nil)
  # unresign
  if delay == 0
    @resign_until = nil
    campaign
  else
    @resign_until = time_now + (delay || [@update_period*6, @stale_time].min)
    current = nil
    server.cas(key) do |val|
      current = val
      if is_me?(val)
        my_token.merge(:timestamp => time_now - 2*@stale_time)
      else
        raise ZeevexCluster::Coordinator::DontChange
      end
    end
    failed_lock(my_token, current)
  end
rescue ZeevexCluster::Coordinator::ConnectionError
  failed_lock(my_token, nil)
end

#startObject



44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
# File 'lib/zeevex_cluster/strategy/cas.rb', line 44

def start
  raise "Already started" if @thread || @state == :started
  @start_time = time_now
  @state  = :started
  @locked_at = nil
  @thread = Thread.new do
    begin
      change_my_status :member
      spin
    rescue
      logger.warn "rescued from spin: #{$!.inspect}\n#{$!.backtrace.join("\n")}"
    ensure
      logger.debug "spin over"
      @state = :stopped
    end
  end
end

#steal_election!Object



101
102
103
104
105
106
107
108
109
110
# File 'lib/zeevex_cluster/strategy/cas.rb', line 101

def steal_election!
  logger.warn "Stealing election"
  @resign_until = nil
  me = my_token
  server.set(key, me)
  got_lock(me)
  true
rescue ZeevexCluster::Coordinator::ConnectionError
  false
end

#stopObject



62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
# File 'lib/zeevex_cluster/strategy/cas.rb', line 62

def stop
  case @state
    when :stop_requested
    when :stopped
    when :started
      @state = :stop_requested
      @thread.raise(StopException.new 'stop')
    else
      raise "Bad state: #{@state}"
  end
  @thread.join
  @thread = nil
  change_my_status :nonmember
  reset_state_vars
end