Class: Cassandra::Tasks::Autoclean
- Inherits:
-
Object
- Object
- Cassandra::Tasks::Autoclean
- Includes:
- DaemonRunner::Logger
- Defined in:
- lib/cassandra/tasks/autoclean.rb
Instance Attribute Summary collapse
-
#token_cache_path ⇒ String
readonly
The path on disk where tokens will be cached.
Instance Method Summary collapse
-
#address ⇒ String?
Get the IP address of this node.
-
#cached_tokens ⇒ Array<String>
Get the cached tokens this node owns.
-
#initialize(options = {}) ⇒ Autoclean
constructor
Create a new Autoclean task.
-
#run! ⇒ Object
Run the Cassandra cleanup process if necessary.
-
#save_tokens ⇒ Object
Save the list of tokens this node owns to disk These can be read by ‘cached_tokens`.
-
#schedule ⇒ Object
Schedule the Cassandra cleanup process to run daily.
-
#state ⇒ state?
Return the state of the Cassandra node.
-
#status ⇒ :up, :down
Return the status of the Cassandra node.
- #task_id ⇒ Object
-
#tokens ⇒ Array<String>
Get the tokens this node owns.
Constructor Details
#initialize(options = {}) ⇒ Autoclean
Create a new Autoclean task
(see #token_cache_path)
23 24 25 26 27 28 29 |
# File 'lib/cassandra/tasks/autoclean.rb', line 23 def initialize( = {}) @token_cache_path = [:token_cache_path] @token_cache_path ||= File.join(Dir.tmpdir, 'autoclean-tokens.json') @service_name = [:cleanup_service_name] @lock_count = [:cleanup_lock_count] @logger = [:logger] end |
Instance Attribute Details
#token_cache_path ⇒ String (readonly)
Returns the path on disk where tokens will be cached.
14 15 16 |
# File 'lib/cassandra/tasks/autoclean.rb', line 14 def token_cache_path @token_cache_path end |
Instance Method Details
#address ⇒ String?
Get the IP address of this node
188 189 190 191 192 193 194 |
# File 'lib/cassandra/tasks/autoclean.rb', line 188 def address if @address.nil? addr = Socket.ip_address_list.find { |addr| addr.ipv4_private? } @address = addr.ip_address unless addr.nil? end @address end |
#cached_tokens ⇒ Array<String>
Get the cached tokens this node owns
106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 |
# File 'lib/cassandra/tasks/autoclean.rb', line 106 def cached_tokens if token_cache.closed? logger.debug "Failed to read cached tokens because file is closed." return [] end token_cache.seek 0 data = token_cache.read data = JSON.parse data unless data['version'] == ::Cassandra::Utils::VERSION logger.debug "Failed to read cached tokens because version didn't match. Expected #{::Cassandra::Utils::VERSION} got #{data['version']}" return [] end tokens = data['tokens'] if tokens.nil? logger.debug "Failed to read cached tokens because they're nil" return [] end unless tokens.respond_to? :each logger.debug "Failed to read cached tokens because they're invalid" return [] end tokens.sort! tokens # Token file could not be opend or parsed rescue Errno::ENOENT, JSON::ParserError => e logger.debug "Caught exception while reading cached tokens" logger.debug e [] end |
#run! ⇒ Object
Run the Cassandra cleanup process if necessary
76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 |
# File 'lib/cassandra/tasks/autoclean.rb', line 76 def run! node_status = status unless node_status == :up logger.debug "Cleanup skipped because of node status. Expected up got #{node_status}" return end node_state = state unless node_state == :normal logger.debug "Cleanup skipped because of node state. Expected normal got #{node_state}" return end new_tokens = Set.new tokens old_tokens = Set.new cached_tokens if new_tokens == old_tokens logger.debug "Cleanup skipped because tokens haven't changed" return end ::DaemonRunner::Semaphore.lock(@service_name, @lock_count) do result = nodetool_cleanup save_tokens if !result.nil? && result.exitstatus == 0 end end |
#save_tokens ⇒ Object
Save the list of tokens this node owns to disk These can be read by ‘cached_tokens`
144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 |
# File 'lib/cassandra/tasks/autoclean.rb', line 144 def save_tokens data = { :timestamp => Time.now.iso8601, :tokens => tokens, :version => ::Cassandra::Utils::VERSION } if token_cache.closed? logger.debug "Failed to save cached tokens because file is closed." return [] end token_cache.seek 0 token_cache.truncate 0 token_cache.write data.to_json end |
#schedule ⇒ Object
Schedule the Cassandra cleanup process to run daily
33 34 35 |
# File 'lib/cassandra/tasks/autoclean.rb', line 33 def schedule [:interval, '1d'] end |
#state ⇒ state?
Return the state of the Cassandra node
The returned state is reported by “nodetool netstats”.
64 65 66 67 68 69 70 71 72 |
# File 'lib/cassandra/tasks/autoclean.rb', line 64 def state results = (nodetool_netstats || '').split("\n") results.map! { |line| line.strip } results.select! { |line| line.include? 'Mode:' } results.map! { |line| line.split(':')[1] } results.compact! return nil if results.size != 1 results.first.strip.downcase.to_sym end |
#status ⇒ :up, :down
Return the status of the Cassandra node
A node is considered up if it has a status of “Up” as reported by “nodetool status”. If multiple nodes with this node’s IP address show up in “nodetool status”, this node is considered down.
45 46 47 48 49 50 51 52 53 54 55 56 |
# File 'lib/cassandra/tasks/autoclean.rb', line 45 def status return(:down).tap { logger.warn 'Cassandra node is DOWN' } if address.nil? results = (nodetool_status || '').split("\n") results.map! { |line| line.strip } results.select! { |line| line.include? address } results.map! { |line| line.split(/\s+/)[0] } results.compact! return(:down).tap do logger.warn "Cannot find the Cassandra node (#{address}) in `nodetool status`" end if results.size != 1 (results.first[0] == 'U') ? :up : :down end |
#task_id ⇒ Object
196 197 198 |
# File 'lib/cassandra/tasks/autoclean.rb', line 196 def task_id ['autoclean', 'nodetool'] end |
#tokens ⇒ Array<String>
Get the tokens this node owns
The “nodetool ring” command returns
Address Rack Status State Load Size Owns Token 127.0.0.1 r1 Up Normal 10 GB 33% 123456789
170 171 172 173 174 175 176 177 178 179 180 181 182 |
# File 'lib/cassandra/tasks/autoclean.rb', line 170 def tokens if address.nil? logger.debug "Failed to read live tokens because address is nil" return [] end results = (nodetool_ring || '').split("\n") results.map! { |line| line.strip } results.select! { |line| line.start_with? address } results.map! { |line| line.split(/\s+/)[7] } results.compact! results.sort end |