Class: Polipus::QueueOverflow::CassandraQueue
- Inherits:
-
Object
- Object
- Polipus::QueueOverflow::CassandraQueue
- Defined in:
- lib/polipus-cassandra/queue_overflow/cassandra_queue.rb
Instance Attribute Summary collapse
-
#cluster ⇒ Object
CassandraQueue wants to persists documents (please, still ignore the jargon inherited from Mongo) like the following JSON-ish entry.
-
#keyspace ⇒ Object
CassandraQueue wants to persists documents (please, still ignore the jargon inherited from Mongo) like the following JSON-ish entry.
-
#table ⇒ Object
CassandraQueue wants to persists documents (please, still ignore the jargon inherited from Mongo) like the following JSON-ish entry.
Instance Method Summary collapse
-
#clear ⇒ Object
Clear is a fancy name for a DROP TABLE IF EXISTS <table_>.
-
#empty? ⇒ Boolean
Return true if the table has no rows.
-
#initialize(options = {}) ⇒ CassandraQueue
constructor
There is a validation enforced to ‘:keyspace` and `:table` because Cassandra is not happy when a keyspace or a table name contains an hyphen.
- #keyspace!(replication = nil, durable_writes = true) ⇒ Object
-
#length ⇒ Object
(also: #size)
Length aka Size aka Count is supported in Cassandra…
-
#pop(n = 1) ⇒ Object
(also: #dec, #shift)
Pop removes ‘n’ entries from the overflow table (treated as a queue) and returns a paged result.
-
#push(data) ⇒ Object
(also: #enc, #<<)
push is your the “write into Cassandra” method.
- #session ⇒ Object
-
#table!(properties = nil) ⇒ Object
Taking a look in the Cassandra KEYSPACE you will found:.
Constructor Details
#initialize(options = {}) ⇒ CassandraQueue
There is a validation enforced to ‘:keyspace` and `:table` because Cassandra is not happy when a keyspace or a table name contains an hyphen.
67 68 69 70 71 72 73 74 75 76 |
# File 'lib/polipus-cassandra/queue_overflow/cassandra_queue.rb', line 67 def initialize( = {}) raise ArgumentError unless () @cluster = [:cluster] @keyspace = [:keyspace].gsub("-", "_") @table = [:table].gsub("-", "_") @semaphore = Mutex.new @options = @timeuuid_generator = Cassandra::Uuid::Generator.new @logger = @options[:logger] ||= Logger.new(STDOUT).tap { |l| l.level = Logger::INFO } end |
Instance Attribute Details
#cluster ⇒ Object
CassandraQueue wants to persists documents (please, still ignore the jargon inherited from Mongo) like the following JSON-ish entry.
There is no superclass here but I’ve in mind the interface implicitly defined by Polipus::QueueOverflow::DevNullQueue that, more or less has:
def initialize def length def empty? def clear def push(_data) def pop(_ = false)
Taking some data from our backend.production.*****.com/polipus I found:
mongos> db.getCollectionNames() [
"data-com-companies",
"data_com_companies",
"googleplus",
"linkedin",
"linkedin-companies",
"linkedin_companies_parsed",
"linkedin_jobs",
"linkedin_jobs_parsed",
"linkedin_pages_errors",
"polipus_q_overflow_data-com-companies_queue_overflow",
"polipus_q_overflow_data_com_companies_queue_overflow",
"polipus_q_overflow_googleplus_queue_overflow",
"polipus_q_overflow_linkedin-companies_queue_overflow",
"polipus_q_overflow_linkedin_jobs_queue_overflow",
"polipus_q_overflow_linkedin_jobs_queue_overflow_old",
"polipus_q_overflow_linkedin_refresh_queue_overflow",
"system.indexes"
]
mongos> db.getCollection(“polipus_q_overflow_linkedin_jobs_queue_overflow”).find().limit(1)
"_id" : ObjectId("54506b98e3d55b20c40b32d3"),
"payload" : "{\"url\":\"https://www.linkedin.com/job/product-designer-jobs/?page_num=7&trk=jserp_pagination_next\",\"depth\":6,\"referer\":\"https://www.linkedin.com/job/product-designer-jobs/?page_num=6&trk=jserp_pagination_6\",\"fetched\":false"
}
mongos> db.polipus_q_overflow_linkedin_refresh_queue_overflow.find().limit(10) {
"_id" : ObjectId("544072b6e3d55b0db7000001"),
"payload" : "{\"url\":\"http://www.linkedin.com/in/*****\",\"depth\":0,\"fetched\":false}"
}
We also assume this MonkeyPatch: Polipus::QueueOverflow.cassandra_queue(namespace, options = {}) that returns instances of this class.
62 63 64 |
# File 'lib/polipus-cassandra/queue_overflow/cassandra_queue.rb', line 62 def cluster @cluster end |
#keyspace ⇒ Object
CassandraQueue wants to persists documents (please, still ignore the jargon inherited from Mongo) like the following JSON-ish entry.
There is no superclass here but I’ve in mind the interface implicitly defined by Polipus::QueueOverflow::DevNullQueue that, more or less has:
def initialize def length def empty? def clear def push(_data) def pop(_ = false)
Taking some data from our backend.production.*****.com/polipus I found:
mongos> db.getCollectionNames() [
"data-com-companies",
"data_com_companies",
"googleplus",
"linkedin",
"linkedin-companies",
"linkedin_companies_parsed",
"linkedin_jobs",
"linkedin_jobs_parsed",
"linkedin_pages_errors",
"polipus_q_overflow_data-com-companies_queue_overflow",
"polipus_q_overflow_data_com_companies_queue_overflow",
"polipus_q_overflow_googleplus_queue_overflow",
"polipus_q_overflow_linkedin-companies_queue_overflow",
"polipus_q_overflow_linkedin_jobs_queue_overflow",
"polipus_q_overflow_linkedin_jobs_queue_overflow_old",
"polipus_q_overflow_linkedin_refresh_queue_overflow",
"system.indexes"
]
mongos> db.getCollection(“polipus_q_overflow_linkedin_jobs_queue_overflow”).find().limit(1)
"_id" : ObjectId("54506b98e3d55b20c40b32d3"),
"payload" : "{\"url\":\"https://www.linkedin.com/job/product-designer-jobs/?page_num=7&trk=jserp_pagination_next\",\"depth\":6,\"referer\":\"https://www.linkedin.com/job/product-designer-jobs/?page_num=6&trk=jserp_pagination_6\",\"fetched\":false"
}
mongos> db.polipus_q_overflow_linkedin_refresh_queue_overflow.find().limit(10) {
"_id" : ObjectId("544072b6e3d55b0db7000001"),
"payload" : "{\"url\":\"http://www.linkedin.com/in/*****\",\"depth\":0,\"fetched\":false}"
}
We also assume this MonkeyPatch: Polipus::QueueOverflow.cassandra_queue(namespace, options = {}) that returns instances of this class.
62 63 64 |
# File 'lib/polipus-cassandra/queue_overflow/cassandra_queue.rb', line 62 def keyspace @keyspace end |
#table ⇒ Object
CassandraQueue wants to persists documents (please, still ignore the jargon inherited from Mongo) like the following JSON-ish entry.
There is no superclass here but I’ve in mind the interface implicitly defined by Polipus::QueueOverflow::DevNullQueue that, more or less has:
def initialize def length def empty? def clear def push(_data) def pop(_ = false)
Taking some data from our backend.production.*****.com/polipus I found:
mongos> db.getCollectionNames() [
"data-com-companies",
"data_com_companies",
"googleplus",
"linkedin",
"linkedin-companies",
"linkedin_companies_parsed",
"linkedin_jobs",
"linkedin_jobs_parsed",
"linkedin_pages_errors",
"polipus_q_overflow_data-com-companies_queue_overflow",
"polipus_q_overflow_data_com_companies_queue_overflow",
"polipus_q_overflow_googleplus_queue_overflow",
"polipus_q_overflow_linkedin-companies_queue_overflow",
"polipus_q_overflow_linkedin_jobs_queue_overflow",
"polipus_q_overflow_linkedin_jobs_queue_overflow_old",
"polipus_q_overflow_linkedin_refresh_queue_overflow",
"system.indexes"
]
mongos> db.getCollection(“polipus_q_overflow_linkedin_jobs_queue_overflow”).find().limit(1)
"_id" : ObjectId("54506b98e3d55b20c40b32d3"),
"payload" : "{\"url\":\"https://www.linkedin.com/job/product-designer-jobs/?page_num=7&trk=jserp_pagination_next\",\"depth\":6,\"referer\":\"https://www.linkedin.com/job/product-designer-jobs/?page_num=6&trk=jserp_pagination_6\",\"fetched\":false"
}
mongos> db.polipus_q_overflow_linkedin_refresh_queue_overflow.find().limit(10) {
"_id" : ObjectId("544072b6e3d55b0db7000001"),
"payload" : "{\"url\":\"http://www.linkedin.com/in/*****\",\"depth\":0,\"fetched\":false}"
}
We also assume this MonkeyPatch: Polipus::QueueOverflow.cassandra_queue(namespace, options = {}) that returns instances of this class.
62 63 64 |
# File 'lib/polipus-cassandra/queue_overflow/cassandra_queue.rb', line 62 def table @table end |
Instance Method Details
#clear ⇒ Object
Clear is a fancy name for a DROP TABLE IF EXISTS <table_>.
100 101 102 103 104 |
# File 'lib/polipus-cassandra/queue_overflow/cassandra_queue.rb', line 100 def clear table_ = [keyspace, table].compact.join '.' statement = "DROP TABLE IF EXISTS #{table_} ;" session.execute(statement) end |
#empty? ⇒ Boolean
Return true if the table has no rows. This is achieved with a ‘SELECT WITH LIMIT 1’ query.
95 96 97 |
# File 'lib/polipus-cassandra/queue_overflow/cassandra_queue.rb', line 95 def empty? return get.first.nil? end |
#keyspace!(replication = nil, durable_writes = true) ⇒ Object
218 219 220 221 222 |
# File 'lib/polipus-cassandra/queue_overflow/cassandra_queue.rb', line 218 def keyspace!(replication = nil, durable_writes = true) replication ||= "{'class': 'SimpleStrategy', 'replication_factor': '3'}" statement = "CREATE KEYSPACE IF NOT EXISTS #{keyspace} WITH replication = #{replication} AND durable_writes = #{durable_writes};" cluster.connect.execute(statement) end |
#length ⇒ Object Also known as: size
Length aka Size aka Count is supported in Cassandra… like your POSQL you can COUNT.
SELECT COUNT (*) FROM keyspace.table_name;
TBH I’m not sure if being “defensive” and returning 0/nil in case the results is_empty? … I’m leaving (now) the code simple and noisy if something went wrong in the COUNT.
86 87 88 89 90 91 |
# File 'lib/polipus-cassandra/queue_overflow/cassandra_queue.rb', line 86 def length table_ = [keyspace, table].compact.join '.' statement = "SELECT COUNT (*) FROM #{table_} ;" result = session.execute(statement) result.first['count'] end |
#pop(n = 1) ⇒ Object Also known as: dec, shift
Pop removes ‘n’ entries from the overflow table (treated as a queue) and returns a paged result. results.class #=> Cassandra::Results::Paged
Polipus is expecting a String, that will be JSONparsed with the purpose to build a
151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 |
# File 'lib/polipus-cassandra/queue_overflow/cassandra_queue.rb', line 151 def pop(n = 1) # A recap: pop should remove oldest N messages and return to the caller. # # Let's see how this queue is implemented. # In redis, messages are LPUSH-ed: # # 4 - 3 - 2 - 1 --> REDIS # 4 - 3 - 2 --> REDIS # 4 - 3 --> REDIS # 4 --> REDIS # # Then, in the fast_dequeue, are RPOP-ped: # # REDIS --> 1 # REDIS --> 2 - 1 # REDIS --> 3 - 2 - 1 # REDIS --> 4 - 3 - 2 - 1 # # Then, are received in this order: # [1] -> TimeUUID(1) = ... # [2] -> TimeUUID(1) = ... # [3] -> TimeUUID(1) = ... # [4] -> TimeUUID(1) = ... # # As you can see below, are ORDER BY (created_at ASC)... that means # "olders first". When using 'LIMIT n' in a query, you get the 'n' # olders entries. # # cqlsh> SELECT * FROM polipus_queue_overflow_linkedin.linkedin_overflow ; # # queue_name | created_at | payload # ---------------------------------+--------------------------------------+--------- # polipus_queue_overflow_linkedin | 4632d49c-1c04-11e5-844b-0b314c777502 | "1" # polipus_queue_overflow_linkedin | 46339f8a-1c04-11e5-844b-0b314c777502 | "2" # polipus_queue_overflow_linkedin | 46349962-1c04-11e5-844b-0b314c777502 | "3" # polipus_queue_overflow_linkedin | 46351860-1c04-11e5-844b-0b314c777502 | "4" # # (4 rows) # cqlsh> SELECT * FROM polipus_queue_overflow_linkedin.linkedin_overflow LIMIT 1; # # queue_name | created_at | payload # ---------------------------------+--------------------------------------+--------- # polipus_queue_overflow_linkedin | 4632d49c-1c04-11e5-844b-0b314c777502 | "1" # # (1 rows) # table_ = [keyspace, table].compact.join '.' results = get(n) results.each do |entry| statement = "DELETE FROM #{table_} WHERE queue_name = '#{entry['queue_name']}' AND created_at = #{entry['created_at']} ;" session.execute(statement) end # Let's rispect the API as expected by Polipus. # Otherwise the execute returns a Cassandra::Results::Paged if !results.nil? && results.respond_to?(:count) && results.count == 1 return results.first['payload'] end return results end |
#push(data) ⇒ Object Also known as: enc, <<
push is your the “write into Cassandra” method.
107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 |
# File 'lib/polipus-cassandra/queue_overflow/cassandra_queue.rb', line 107 def push(data) return nil if data.nil? obj = MultiJson.decode(data) table_ = [keyspace, table].compact.join('.') queue_name = @keyspace created_at = @timeuuid_generator.now begin @semaphore.synchronize do if obj.has_key?('payload') && !obj['payload'].empty? payload = MultiJson.encode(obj['payload']) else payload = nil end column_names = %w[ queue_name created_at payload ] values_placeholders = column_names.map{|_| '?'}.join(',') statement = "INSERT INTO #{table_} ( #{column_names.join(',')} ) VALUES (#{values_placeholders});" session.execute( session.prepare(statement), arguments: [ queue_name, created_at, payload ]) end rescue Encoding::UndefinedConversionError puts $!.error_char.dump puts $!.error_char.encoding end @logger.debug { "Writing this entry [#{[queue_name, created_at].to_s}]" } [queue_name, created_at].to_s end |
#session ⇒ Object
224 225 226 |
# File 'lib/polipus-cassandra/queue_overflow/cassandra_queue.rb', line 224 def session @session ||= @cluster.connect(keyspace) end |
#table!(properties = nil) ⇒ Object
Taking a look in the Cassandra KEYSPACE you will found:
cqlsh> DESCRIBE KEYSPACE polipus_queue_overflow_linkedin ;
CREATE KEYSPACE polipus_queue_overflow_linkedin WITH replication = ‘SimpleStrategy’, ‘replication_factor’: ‘3’ AND durable_writes = true;
CREATE TABLE polipus_queue_overflow_linkedin.linkedin_overflow (
queue_name text,
created_at timeuuid,
payload text,
PRIMARY KEY (queue_name, created_at)
) WITH CLUSTERING ORDER BY (created_at ASC)
AND bloom_filter_fp_chance = 0.01
AND caching = '{"keys":"ALL", "rows_per_partition":"NONE"}'
AND comment = ''
AND compaction = {'min_threshold': '4', 'class': 'org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy', 'max_threshold': '32'}
AND compression = {'sstable_compression': 'org.apache.cassandra.io.compress.LZ4Compressor'}
AND dclocal_read_repair_chance = 0.1
AND default_time_to_live = 0
AND gc_grace_seconds = 864000
AND max_index_interval = 2048
AND memtable_flush_period_in_ms = 0
AND min_index_interval = 128
AND read_repair_chance = 0.0
AND speculative_retry = '99.0PERCENTILE';
This means that:
-
queue_name is partition key;
-
created_at is clustering key;
With sample data:
cqlsh> SELECT * FROM polipus_queue_overflow_linkedin.linkedin_overflow LIMIT 1 ;
queue_name | created_at | payload
———————————--------------------------------------
———————————————————————————+
polipus_queue_overflow_linkedin | de17ece6-1e5e-11e5-b997-47a87c40c422 | "{\"url\":\"http://www.linkedin.com/in/foobar\",\"depth\":0,\"fetched\":false}"
(1 rows) cqlsh>
269 270 271 272 273 274 275 276 277 278 279 280 281 |
# File 'lib/polipus-cassandra/queue_overflow/cassandra_queue.rb', line 269 def table!(properties = nil) table_ = [keyspace, table].compact.join '.' def_ = "CREATE TABLE IF NOT EXISTS #{table_} ( queue_name TEXT, created_at TIMEUUID, payload TEXT, PRIMARY KEY (queue_name, created_at) )" props = Array(properties).join(' AND ') statement = props.empty? ? "#{def_};" : "#{def_} WITH #{props};" session.execute(statement) end |