Class: DaemonRunner::Semaphore

Inherits:
Object
  • Object
show all
Extended by:
Logger
Includes:
Logger
Defined in:
lib/daemon_runner/semaphore.rb

Overview

Manage semaphore locks with Consul

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Methods included from Logger

logger, logger_name

Constructor Details

#initialize(name:, prefix: nil, lock: nil, limit: 3) ⇒ Semaphore

Returns a new instance of Semaphore.

Parameters:

  • name (String)

    The name of the session, it is also used in the prefix

  • prefix (String|NilClass) (defaults to: nil)

    The Consul Kv prefix

  • lock (String|NilClass) (defaults to: nil)

    The path to the lock file



60
61
62
63
64
65
66
67
68
69
# File 'lib/daemon_runner/semaphore.rb', line 60

def initialize(name:, prefix: nil, lock: nil, limit: 3)
  create_session(name)
  @prefix = prefix.nil? ? "service/#{name}/lock/" : prefix
  @prefix += '/' unless @prefix.end_with?('/')
  @lock = lock.nil? ? "#{@prefix}.lock" : lock
  @lock_modify_index = nil
  @lock_content = nil
  @limit = set_limit(limit)
  @reset = false
end

Instance Attribute Details

#limitObject (readonly)

The number of nodes that can obtain a semaphore lock



55
56
57
# File 'lib/daemon_runner/semaphore.rb', line 55

def limit
  @limit
end

#lock_contentObject (readonly)

The lock content



49
50
51
# File 'lib/daemon_runner/semaphore.rb', line 49

def lock_content
  @lock_content
end

#lock_modify_indexObject (readonly)

The current lock modify index



46
47
48
# File 'lib/daemon_runner/semaphore.rb', line 46

def lock_modify_index
  @lock_modify_index
end

#membersObject (readonly)

The current semaphore members



43
44
45
# File 'lib/daemon_runner/semaphore.rb', line 43

def members
  @members
end

#prefixObject (readonly)

The Consul key prefix



52
53
54
# File 'lib/daemon_runner/semaphore.rb', line 52

def prefix
  @prefix
end

#sessionObject (readonly)

The Consul session



37
38
39
# File 'lib/daemon_runner/semaphore.rb', line 37

def session
  @session
end

#stateObject (readonly)

The current state of the semaphore



40
41
42
# File 'lib/daemon_runner/semaphore.rb', line 40

def state
  @state
end

Class Method Details

.lock(name, limit = 3, **options) ⇒ DaemonRunner::Semaphore

Acquire a lock with the current session

Parameters:

  • limit (Integer) (defaults to: 3)

    The number of nodes that can request the lock

Returns:

See Also:



17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
# File 'lib/daemon_runner/semaphore.rb', line 17

def lock(name, limit = 3, **options)
  options.merge!(name: name)
  semaphore = Semaphore.new(options)
  semaphore.lock
  if block_given?
    lock_thr = semaphore.renew
    yield
  end
  semaphore
rescue Exception => e
  logger.error e
  logger.debug e.backtrace.join("\n")
  raise
ensure
  lock_thr.kill
  semaphore.release
end

Instance Method Details

#contender_key(value = 'none') ⇒ Object

Create a contender key



124
125
126
127
128
129
130
131
132
133
# File 'lib/daemon_runner/semaphore.rb', line 124

def contender_key(value = 'none')
  if value.nil? || value.empty?
    raise ArgumentError, 'Value cannot be empty or nil'
  end
  key = "#{prefix}/#{session.id}"
  ::DaemonRunner::RetryErrors.retry do
    @contender_key = Diplomat::Lock.acquire(key, session.id, value)
  end
  @contender_key
end

#create_session(name) ⇒ Object



106
107
108
109
110
# File 'lib/daemon_runner/semaphore.rb', line 106

def create_session(name)
  ::DaemonRunner::RetryErrors.retry(exceptions: [DaemonRunner::Session::CreateSessionError]) do
    @session = Session.start(name, behavior: 'delete')
  end
end

#lockBoolean

Obtain a lock with the current session

Returns:

  • (Boolean)

    true if the lock was obtained



75
76
77
78
79
# File 'lib/daemon_runner/semaphore.rb', line 75

def lock
  contender_key
  semaphore_state
  try_lock
end

#releaseBoolean

Release a lock with the current session

Returns:

  • (Boolean)

    true if the lock was released



100
101
102
103
# File 'lib/daemon_runner/semaphore.rb', line 100

def release
  semaphore_state
  try_release
end

#renewThread

Renew lock watching for changes

Returns:

  • (Thread)

    Thread running a blocking call maintaining the lock state



84
85
86
87
88
89
90
91
92
93
94
# File 'lib/daemon_runner/semaphore.rb', line 84

def renew
  thr = Thread.new do
    loop do
      if renew?
        semaphore_state
        try_lock
      end
    end
  end
  thr
end

#renew?Boolean

Start a blocking query on the prefix, if there are changes we need to try to obtain the lock again.

Returns:

  • (Boolean)

    true if there are changes, false if the request has timed out



180
181
182
183
184
185
186
187
188
# File 'lib/daemon_runner/semaphore.rb', line 180

def renew?
  logger.debug("Watching Consul #{prefix} for changes")
  options = { recurse: true }
  changes = Diplomat::Kv.get(prefix, options, :wait, :wait)
  logger.info("Changes on #{prefix} detected") if changes
  changes
rescue StandardError => e
  logger.error(e)
end

#semaphore_stateObject

Get the current semaphore state by fetching all conterder keys and the lock key



137
138
139
140
141
142
# File 'lib/daemon_runner/semaphore.rb', line 137

def semaphore_state
  options = { decode_values: true, recurse: true }
  @state = Diplomat::Kv.get(prefix, options, :return)
  decode_semaphore_state unless state.empty?
  state
end

#set_limit(new_limit) ⇒ Object



112
113
114
115
116
117
118
119
120
121
# File 'lib/daemon_runner/semaphore.rb', line 112

def set_limit(new_limit)
  if lock_exists?
    if new_limit.to_i != @limit.to_i
      logger.warn 'Limit in lockfile and @limit do not match using limit from lockfile'
    end
    @limit = lock_content['Limit']
  else
    @limit = new_limit
  end
end

#try_lockObject



144
145
146
147
148
149
150
151
152
153
# File 'lib/daemon_runner/semaphore.rb', line 144

def try_lock
  prune_members
  do_update = add_self_to_holders
  @reset = false
  if do_update
    format_holders
    @locked = write_lock
  end
  log_lock_state
end

#try_releaseObject



155
156
157
158
159
160
161
162
163
164
# File 'lib/daemon_runner/semaphore.rb', line 155

def try_release
  do_update = remove_self_from_holders
  if do_update
    format_holders
    @locked = !write_lock
  end
  DaemonRunner::Session.release(prefix)
  session.destroy!
  log_release_state
end

#write_lockBoolean

Write a new lock file if the number of contenders is less than limit

Returns:

  • (Boolean)

    true if the lock was written succesfully



168
169
170
171
172
173
# File 'lib/daemon_runner/semaphore.rb', line 168

def write_lock
  index = lock_modify_index.nil? ? 0 : lock_modify_index
  value = generate_lockfile
  return true if value == true
  Diplomat::Kv.put(@lock, value, cas: index)
end