Class: CI::Queue::Redis::Monitor

Inherits:
Object
  • Object
show all
Defined in:
lib/ci/queue/redis/monitor.rb

Constant Summary collapse

DEV_SCRIPTS_ROOT =
::File.expand_path('../../../../../../redis', __FILE__)
RELEASE_SCRIPTS_ROOT =
::File.expand_path('../../redis', __FILE__)
HEADER =
'L'
HEADER_SIZE =
[0].pack(HEADER).bytesize

Instance Method Summary collapse

Constructor Details

#initialize(pipe, logger, redis_url, zset_key, owners_key, leases_key) ⇒ Monitor

Returns a new instance of Monitor.



16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
# File 'lib/ci/queue/redis/monitor.rb', line 16

def initialize(pipe, logger, redis_url, zset_key, owners_key, leases_key)
  @zset_key = zset_key
  @owners_key = owners_key
  @leases_key = leases_key
  @logger = logger
  @redis = ::Redis.new(url: redis_url, reconnect_attempts: [0, 0, 0.1, 0.5, 1, 3, 5])
  @shutdown = false
  @pipe = pipe
  @self_pipe_reader, @self_pipe_writer = IO.pipe
  @self_pipe_writer.sync = true
  @queue = []
  @deadlines = {}
  %i[TERM INT USR1].each do |sig|
    Signal.trap(sig) { soft_signal(sig) }
  end
end

Instance Method Details

#eval_script(script, *args) ⇒ Object



48
49
50
# File 'lib/ci/queue/redis/monitor.rb', line 48

def eval_script(script, *args)
  @redis.evalsha(load_script(script), *args)
end

#load_script(script) ⇒ Object



52
53
54
55
# File 'lib/ci/queue/redis/monitor.rb', line 52

def load_script(script)
  @scripts_cache ||= {}
  @scripts_cache[script] ||= @redis.script(:load, read_script(script))
end

#monitorObject



116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
# File 'lib/ci/queue/redis/monitor.rb', line 116

def monitor
  @logger.debug("Starting monitor")
  ios = [@self_pipe_reader, @pipe]

  until @shutdown
    while (sig = @queue.shift)
      case sig
      when :INT, :TERM
        @logger.debug("Received #{sig}, exiting")
        @shutdown = 0
        break
      else
        raise "Unknown signal: #{sig.inspect}"
      end
    end

    wait_for_events(ios)
  end

  @logger.debug('Done')
  @shutdown
end

#process_messages(io) ⇒ Object



90
91
92
93
94
95
96
# File 'lib/ci/queue/redis/monitor.rb', line 90

def process_messages(io)
  while (message = read_message(io))
    type, kwargs = message
    kwargs.transform_keys!(&:to_sym)
    public_send("process_#{type}", **kwargs)
  end
end

#process_tick!(id:, lease:) ⇒ Object



38
39
40
41
42
43
44
45
46
# File 'lib/ci/queue/redis/monitor.rb', line 38

def process_tick!(id:, lease:)
  eval_script(
    :heartbeat,
    keys: [@zset_key, @leases_key],
    argv: [Time.now.to_f, id, lease]
  )
rescue => error
  @logger.info(error)
end

#read_message(io) ⇒ Object



77
78
79
80
81
82
83
84
85
86
87
88
# File 'lib/ci/queue/redis/monitor.rb', line 77

def read_message(io)
  case header = io.read_nonblock(HEADER_SIZE, exception: false)
  when :wait_readable
    nil
  when nil
    @logger.debug('Broken pipe, exiting')
    @shutdown = 0
    false
  else
    JSON.parse(io.read(header.unpack1(HEADER)))
  end
end

#read_script(name) ⇒ Object



57
58
59
60
61
62
63
64
65
66
67
# File 'lib/ci/queue/redis/monitor.rb', line 57

def read_script(name)
  resolve_lua_includes(
    ::File.read(::File.join(DEV_SCRIPTS_ROOT, "#{name}.lua")),
    DEV_SCRIPTS_ROOT,
  )
rescue SystemCallError
  resolve_lua_includes(
    ::File.read(::File.join(RELEASE_SCRIPTS_ROOT, "#{name}.lua")),
    RELEASE_SCRIPTS_ROOT,
  )
end

#resolve_lua_includes(script, root) ⇒ Object



69
70
71
72
73
# File 'lib/ci/queue/redis/monitor.rb', line 69

def resolve_lua_includes(script, root)
  script.gsub(/^-- @include (\S+)$/) do
    ::File.read(::File.join(root, "#{$1}.lua"))
  end
end

#soft_signal(sig) ⇒ Object



33
34
35
36
# File 'lib/ci/queue/redis/monitor.rb', line 33

def soft_signal(sig)
  @queue << sig
  @self_pipe_writer << '.'
end

#wait_for_events(ios) ⇒ Object



98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
# File 'lib/ci/queue/redis/monitor.rb', line 98

def wait_for_events(ios)
  return if @shutdown

  return unless (ready = IO.select(ios, nil, nil, 10))

  ready[0].each do |io|
    case io
    when @self_pipe_reader
      io.read_nonblock(512, exception: false) # Just flush the pipe, the information is in the @queue
    when @pipe
      process_messages(@pipe)
    else
      @logger.debug("Unknown reader: #{io.inspect}")
      raise "Unknown reader: #{io.inspect}"
    end
  end
end