Class: Etcd::Observer

Inherits:
Object
  • Object
show all
Includes:
Loggable
Defined in:
lib/etcd/observer.rb

Instance Method Summary collapse

Methods included from Loggable

#logger, #reset_logger!

Constructor Details

#initialize(client, prefix, handler, options = {}) ⇒ Observer

Returns a new instance of Observer.



5
6
7
8
9
10
11
12
# File 'lib/etcd/observer.rb', line 5

def initialize(client, prefix, handler, options = {})
  @client  = client
  @prefix  = prefix
  @handler = handler
  @options = options
  @index   = nil
  reset_logger!(Logger::DEBUG)
end

Instance Method Details

#call_handler_in_needed(value, key, info) ⇒ Object

etcd has a bug: after restart watches with index fire __sometimes__ with all the previous values workaround:

- execute @handler only if info[:index] had higher value than the last index


33
34
35
36
37
38
39
40
41
42
43
# File 'lib/etcd/observer.rb', line 33

def call_handler_in_needed(value, key, info)
  if info[:index] && @index.to_i <= info[:index]
    # next time start watching from next index
    @index = info[:index] + 1
    logger.debug "index for #{@prefix} ----  #{@index} "
    @handler.call(value, key, info)
  # first-time-fire
  elsif @index == nil
    @handler.call(value, key, info)
  end
end

#cancelObject



45
46
47
48
# File 'lib/etcd/observer.rb', line 45

def cancel
  @running = false
  self
end

#joinObject



57
58
59
60
# File 'lib/etcd/observer.rb', line 57

def join
  @thread.join
  self
end

#pp_statusObject



66
67
68
# File 'lib/etcd/observer.rb', line 66

def pp_status
  "#{@prefix}: #{pp_thread_status}"
end

#pp_thread_statusObject



70
71
72
73
74
75
# File 'lib/etcd/observer.rb', line 70

def pp_thread_status
  st = @thread.status
  st = 'dead by exception'   if st == nil
  st = 'dead by termination' if st == false
  st
end

#rerunObject



50
51
52
53
54
55
# File 'lib/etcd/observer.rb', line 50

def rerun
  logger.debug "rerun for #{@prefix}"
  @thread.terminate if @thread.alive?
  logger.debug "after termination for #{@prefix}"
  run
end

#runObject



14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
# File 'lib/etcd/observer.rb', line 14

def run
  @running = true
  @thread = Thread.start do
    while @running
      logger.debug "********* watching #{@prefix} with index #{@index}"
      @client.watch(@prefix, @options.merge(index: @index)) do |value, key, info|
        if @running
          logger.debug "watch fired for #{@prefix} with #{info.inspect} "
          call_handler_in_needed(value, key, info)
        end
      end
    end
  end
  self
end

#statusObject



62
63
64
# File 'lib/etcd/observer.rb', line 62

def status
  @thread.status
end