Class: VCAP::Services::Base::AsyncJob::Lock

Inherits:
Object
  • Object
show all
Includes:
Error
Defined in:
lib/base/job/lock.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from Error

#failure, #internal_fail, #parse_msg, #success, #timeout_fail

Constructor Details

#initialize(name, opts = {}) ⇒ Lock

Options for lock name - The uuid of the lock timeout - The time that waits to acquire the lock, default 20 seconds expiration - Lock expires in given seconds if not refreshed, default 10 seconds logger - The logger.. ttl - The max time that a thread can acquire the lock, default 600 seconds. Lock raise JOB_TIMEOUT error once the ttl is exceeded.



21
22
23
24
25
26
27
28
29
30
31
# File 'lib/base/job/lock.rb', line 21

def initialize(name, opts={})
  @name = name
  @timeout = opts[:timeout] || 20 #seconds
  @expiration = opts[:expiration] || 10  # seconds
  @ttl = opts[:ttl] || 600 # seconds
  @logger = opts[:logger] || make_logger
  config = Config.redis_config
  raise "Can't find configuration of redis." unless config
  @redis = ::Redis.new(config)
  @released_thread = {}
end

Instance Attribute Details

#expirationObject (readonly)

Returns the value of attribute expiration.



12
13
14
# File 'lib/base/job/lock.rb', line 12

def expiration
  @expiration
end

#nameObject (readonly)

Returns the value of attribute name.



12
13
14
# File 'lib/base/job/lock.rb', line 12

def name
  @name
end

#timeoutObject (readonly)

Returns the value of attribute timeout.



12
13
14
# File 'lib/base/job/lock.rb', line 12

def timeout
  @timeout
end

Instance Method Details

#deleteObject



137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
# File 'lib/base/job/lock.rb', line 137

def delete
  @logger.debug("Deleting lock: #{@name}")
  existing_lock = @redis.get(@name)
  @logger.debug("Lock #{@name} is acquired by others.")if existing_lock.to_f > @lock_expiration
  @redis.watch(@name)
  res = @redis.multi do
    @redis.del(@name)
  end
  if res
    @logger.debug("Lock #{@name} is deleted.")
  else
    @logger.debug("Lock #{@name} is acquired by others.")
  end
  true
end

#lockObject



39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
# File 'lib/base/job/lock.rb', line 39

def lock
  @logger.debug("Acquiring lock: #{@name}")
  started = Time.now.to_f
  expiration = started.to_f + @expiration + 1
  until @redis.setnx(@name, expiration)
    existing_lock = @redis.get(@name)
    if existing_lock.to_f < Time.now.to_f
      @logger.debug("Lock #{@name} is expired, trying to acquire it.")
      break if watch_and_update(@redis, expiration)
    end

    raise ServiceError.new(ServiceError::JOB_QUEUE_TIMEOUT, @timeout)if Time.now.to_f - started > @timeout

    sleep(1)

    expiration = Time.now.to_f + @expiration + 1
  end

  @lock_expiration = expiration
  refresh_thread = setup_refresh_thread
  @logger.debug("Lock #{@name} is acquired, will expire at #{@lock_expiration}")

  begin
    Timeout::timeout(@ttl) do
      yield if block_given?
    end
  rescue Timeout::Error =>e
    raise ServiceError.new(ServiceError::JOB_TIMEOUT, @ttl)
  ensure
    release_thread(refresh_thread)
    delete
  end
end

#make_loggerObject



33
34
35
36
37
# File 'lib/base/job/lock.rb', line 33

def make_logger
  logger = Logger.new(STDOUT)
  logger.level = Logger::ERROR
  logger
end

#release_thread(t) ⇒ Object



86
87
88
89
90
91
92
93
94
95
96
97
# File 'lib/base/job/lock.rb', line 86

def release_thread t
  # gracefully terminate refresh thread.
  @released_thread[t.object_id] = true
  waited = 0
  while (waited += 1) <= 5
    # thread is terminated when t.status == nil or false
    return unless t.status
    sleep 1
  end
  # force terminate after wait 5 seconds.
  t.exit
end

#released?Boolean

Returns:

  • (Boolean)


99
100
101
# File 'lib/base/job/lock.rb', line 99

def released?
  @released_thread[Thread.current.object_id]
end

#setup_refresh_threadObject



103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
# File 'lib/base/job/lock.rb', line 103

def setup_refresh_thread
  t = Thread.new do
    redis = ::Redis.new(Config.redis_config)
    sleep_interval = [1.0, @expiration/2].max.to_i
    begin
      while not released? do
        @logger.debug("Renewing lock #{@name}")
        redis.watch(@name)
        existing_lock = redis.get(@name)

        break if existing_lock.to_f > @lock_expiration # lock has been updated by others
        expiration = Time.now.to_f + @expiration + 1
        break unless watch_and_update(redis, expiration)
        @lock_expiration = expiration
        sleep_interval.times do
          sleep 1
          break if released?
        end
      end
    rescue => e
      @logger.error("Can't renew lock #{@name}, #{e}")
    ensure
      begin
        @logger.debug("Lock renew thread for #{@name} exited.")
        redis.quit
      rescue => e
        # just logging, ignore error
        @logger.debug("Ignore error when quit: #{e}")
      end
    end
  end
  t
end

#watch_and_update(redis, expiration) ⇒ Object



73
74
75
76
77
78
79
80
81
82
83
84
# File 'lib/base/job/lock.rb', line 73

def watch_and_update(redis, expiration)
  redis.watch(@name)
  res = redis.multi do
    redis.set(@name, expiration)
  end
  if res
    @logger.debug("Lock #{@name} is renewed and acquired.")
  else
    @logger.debug("Lock #{@name} was updated by others.")
  end
  res
end