Class: Cassandra::Tasks::Autoclean

Inherits:
Object
  • Object
show all
Includes:
DaemonRunner::Logger
Defined in:
lib/cassandra/tasks/autoclean.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(options = {}) ⇒ Autoclean

Create a new Autoclean task

(see #token_cache_path)

Parameters:

  • options (Object) (defaults to: {})

    optional configuration settings



23
24
25
26
27
28
29
# File 'lib/cassandra/tasks/autoclean.rb', line 23

def initialize(options = {})
  @token_cache_path = options[:token_cache_path]
  @token_cache_path ||= File.join(Dir.tmpdir, 'autoclean-tokens.json')
  @service_name = options[:cleanup_service_name]
  @lock_count = options[:cleanup_lock_count]
  @logger = options[:logger]
end

Instance Attribute Details

#token_cache_pathString (readonly)

Returns the path on disk where tokens will be cached.

Returns:

  • (String)

    the path on disk where tokens will be cached



14
15
16
# File 'lib/cassandra/tasks/autoclean.rb', line 14

def token_cache_path
  @token_cache_path
end

Instance Method Details

#addressString?

Get the IP address of this node

Returns:

  • (String, nil)

    IP address of this node



188
189
190
191
192
193
194
# File 'lib/cassandra/tasks/autoclean.rb', line 188

def address
  if @address.nil?
    addr = Socket.ip_address_list.find { |addr| addr.ipv4_private? }
    @address = addr.ip_address unless addr.nil?
  end
  @address
end

#cached_tokensArray<String>

Get the cached tokens this node owns

Returns:

  • (Array<String>)

    Cached tokens



106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
# File 'lib/cassandra/tasks/autoclean.rb', line 106

def cached_tokens
  if token_cache.closed?
    logger.debug "Failed to read cached tokens because file is closed."
    return []
  end

  token_cache.seek 0
  data = token_cache.read
  data = JSON.parse data

  unless data['version'] == ::Cassandra::Utils::VERSION
    logger.debug "Failed to read cached tokens because version didn't match. Expected #{::Cassandra::Utils::VERSION} got #{data['version']}"
    return []
  end

  tokens = data['tokens']
  if tokens.nil?
    logger.debug "Failed to read cached tokens because they're nil"
    return []
  end

  unless tokens.respond_to? :each
    logger.debug "Failed to read cached tokens because they're invalid"
    return []
  end

  tokens.sort!
  tokens
# Token file could not be opend or parsed
rescue Errno::ENOENT, JSON::ParserError => e
  logger.debug "Caught exception while reading cached tokens"
  logger.debug e
  []
end

#run!Object

Run the Cassandra cleanup process if necessary



76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
# File 'lib/cassandra/tasks/autoclean.rb', line 76

def run!
  node_status = status
  unless node_status == :up
    logger.debug "Cleanup skipped because of node status. Expected up got #{node_status}"
    return
  end

  node_state = state
  unless node_state == :normal
    logger.debug "Cleanup skipped because of node state. Expected normal got #{node_state}"
    return
  end

  new_tokens = Set.new tokens
  old_tokens = Set.new cached_tokens
  if new_tokens == old_tokens
    logger.debug "Cleanup skipped because tokens haven't changed"
    return
  end

  ::DaemonRunner::Semaphore.lock(@service_name, @lock_count) do
    result = nodetool_cleanup
    save_tokens if !result.nil? && result.exitstatus == 0
  end
end

#save_tokensObject

Save the list of tokens this node owns to disk These can be read by ‘cached_tokens`



144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
# File 'lib/cassandra/tasks/autoclean.rb', line 144

def save_tokens
  data = {
    :timestamp => Time.now.iso8601,
    :tokens => tokens,
    :version => ::Cassandra::Utils::VERSION
  }

  if token_cache.closed?
    logger.debug "Failed to save cached tokens because file is closed."
    return []
  end

  token_cache.seek 0
  token_cache.truncate 0
  token_cache.write data.to_json
end

#scheduleObject

Schedule the Cassandra cleanup process to run daily



33
34
35
# File 'lib/cassandra/tasks/autoclean.rb', line 33

def schedule
  [:interval, '1d']
end

#statestate?

Return the state of the Cassandra node

The returned state is reported by “nodetool netstats”.

Returns:



64
65
66
67
68
69
70
71
72
# File 'lib/cassandra/tasks/autoclean.rb', line 64

def state
  results = (nodetool_netstats || '').split("\n")
  results.map! { |line| line.strip }
  results.select! { |line| line.include? 'Mode:' }
  results.map! { |line| line.split(':')[1] }
  results.compact!
  return nil if results.size != 1
  results.first.strip.downcase.to_sym
end

#status:up, :down

Return the status of the Cassandra node

A node is considered up if it has a status of “Up” as reported by “nodetool status”. If multiple nodes with this node’s IP address show up in “nodetool status”, this node is considered down.

Returns:

  • (:up, :down)


45
46
47
48
49
50
51
52
53
54
55
56
# File 'lib/cassandra/tasks/autoclean.rb', line 45

def status
  return(:down).tap { logger.warn 'Cassandra node is DOWN' } if address.nil?
  results = (nodetool_status || '').split("\n")
  results.map! { |line| line.strip }
  results.select! { |line| line.include? address }
  results.map! { |line| line.split(/\s+/)[0] }
  results.compact!
  return(:down).tap do
    logger.warn "Cannot find the Cassandra node (#{address}) in `nodetool status`"
  end if results.size != 1
  (results.first[0] == 'U') ? :up : :down
end

#task_idObject



196
197
198
# File 'lib/cassandra/tasks/autoclean.rb', line 196

def task_id
  ['autoclean', 'nodetool']
end

#tokensArray<String>

Get the tokens this node owns

The “nodetool ring” command returns

Address Rack Status State Load Size Owns Token 127.0.0.1 r1 Up Normal 10 GB 33% 123456789

Returns:

  • (Array<String>)

    Tokens owned by this node



170
171
172
173
174
175
176
177
178
179
180
181
182
# File 'lib/cassandra/tasks/autoclean.rb', line 170

def tokens
  if address.nil?
    logger.debug "Failed to read live tokens because address is nil"
    return []
  end

  results = (nodetool_ring || '').split("\n")
  results.map! { |line| line.strip }
  results.select! { |line| line.start_with? address }
  results.map! { |line| line.split(/\s+/)[7] }
  results.compact!
  results.sort
end