Class: Polipus::QueueOverflow::CassandraQueue

Inherits:
Object
  • Object
show all
Defined in:
lib/polipus-cassandra/queue_overflow/cassandra_queue.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(options = {}) ⇒ CassandraQueue

There is a validation enforced to ‘:keyspace` and `:table` because Cassandra is not happy when a keyspace or a table name contains an hyphen.

Raises:

  • (ArgumentError)


67
68
69
70
71
72
73
74
75
76
# File 'lib/polipus-cassandra/queue_overflow/cassandra_queue.rb', line 67

def initialize(options = {})
  raise ArgumentError unless options_are_valid?(options)
  @cluster = options[:cluster]
  @keyspace = options[:keyspace].gsub("-", "_")
  @table = options[:table].gsub("-", "_")
  @semaphore = Mutex.new
  @options = options
  @timeuuid_generator = Cassandra::Uuid::Generator.new
  @logger = @options[:logger] ||= Logger.new(STDOUT).tap { |l| l.level = Logger::INFO }
end

Instance Attribute Details

#clusterObject

CassandraQueue wants to persists documents (please, still ignore the jargon inherited from Mongo) like the following JSON-ish entry.

There is no superclass here but I’ve in mind the interface implicitly defined by Polipus::QueueOverflow::DevNullQueue that, more or less has:

def initialize def length def empty? def clear def push(_data) def pop(_ = false)

Taking some data from our backend.production.*****.com/polipus I found:

mongos> db.getCollectionNames() [

"data-com-companies",
"data_com_companies",
"googleplus",
"linkedin",
"linkedin-companies",
"linkedin_companies_parsed",
"linkedin_jobs",
"linkedin_jobs_parsed",
"linkedin_pages_errors",
"polipus_q_overflow_data-com-companies_queue_overflow",
"polipus_q_overflow_data_com_companies_queue_overflow",
"polipus_q_overflow_googleplus_queue_overflow",
"polipus_q_overflow_linkedin-companies_queue_overflow",
"polipus_q_overflow_linkedin_jobs_queue_overflow",
"polipus_q_overflow_linkedin_jobs_queue_overflow_old",
"polipus_q_overflow_linkedin_refresh_queue_overflow",
"system.indexes"

]

mongos> db.getCollection(“polipus_q_overflow_linkedin_jobs_queue_overflow”).find().limit(1)

"_id" : ObjectId("54506b98e3d55b20c40b32d3"),
"payload" : "{\"url\":\"https://www.linkedin.com/job/product-designer-jobs/?page_num=7&trk=jserp_pagination_next\",\"depth\":6,\"referer\":\"https://www.linkedin.com/job/product-designer-jobs/?page_num=6&trk=jserp_pagination_6\",\"fetched\":false"

}

mongos> db.polipus_q_overflow_linkedin_refresh_queue_overflow.find().limit(10) {

"_id" : ObjectId("544072b6e3d55b0db7000001"),
"payload" : "{\"url\":\"http://www.linkedin.com/in/*****\",\"depth\":0,\"fetched\":false}"

}

We also assume this MonkeyPatch: Polipus::QueueOverflow.cassandra_queue(namespace, options = {}) that returns instances of this class.



62
63
64
# File 'lib/polipus-cassandra/queue_overflow/cassandra_queue.rb', line 62

def cluster
  @cluster
end

#keyspaceObject

CassandraQueue wants to persists documents (please, still ignore the jargon inherited from Mongo) like the following JSON-ish entry.

There is no superclass here but I’ve in mind the interface implicitly defined by Polipus::QueueOverflow::DevNullQueue that, more or less has:

def initialize def length def empty? def clear def push(_data) def pop(_ = false)

Taking some data from our backend.production.*****.com/polipus I found:

mongos> db.getCollectionNames() [

"data-com-companies",
"data_com_companies",
"googleplus",
"linkedin",
"linkedin-companies",
"linkedin_companies_parsed",
"linkedin_jobs",
"linkedin_jobs_parsed",
"linkedin_pages_errors",
"polipus_q_overflow_data-com-companies_queue_overflow",
"polipus_q_overflow_data_com_companies_queue_overflow",
"polipus_q_overflow_googleplus_queue_overflow",
"polipus_q_overflow_linkedin-companies_queue_overflow",
"polipus_q_overflow_linkedin_jobs_queue_overflow",
"polipus_q_overflow_linkedin_jobs_queue_overflow_old",
"polipus_q_overflow_linkedin_refresh_queue_overflow",
"system.indexes"

]

mongos> db.getCollection(“polipus_q_overflow_linkedin_jobs_queue_overflow”).find().limit(1)

"_id" : ObjectId("54506b98e3d55b20c40b32d3"),
"payload" : "{\"url\":\"https://www.linkedin.com/job/product-designer-jobs/?page_num=7&trk=jserp_pagination_next\",\"depth\":6,\"referer\":\"https://www.linkedin.com/job/product-designer-jobs/?page_num=6&trk=jserp_pagination_6\",\"fetched\":false"

}

mongos> db.polipus_q_overflow_linkedin_refresh_queue_overflow.find().limit(10) {

"_id" : ObjectId("544072b6e3d55b0db7000001"),
"payload" : "{\"url\":\"http://www.linkedin.com/in/*****\",\"depth\":0,\"fetched\":false}"

}

We also assume this MonkeyPatch: Polipus::QueueOverflow.cassandra_queue(namespace, options = {}) that returns instances of this class.



62
63
64
# File 'lib/polipus-cassandra/queue_overflow/cassandra_queue.rb', line 62

def keyspace
  @keyspace
end

#tableObject

CassandraQueue wants to persists documents (please, still ignore the jargon inherited from Mongo) like the following JSON-ish entry.

There is no superclass here but I’ve in mind the interface implicitly defined by Polipus::QueueOverflow::DevNullQueue that, more or less has:

def initialize def length def empty? def clear def push(_data) def pop(_ = false)

Taking some data from our backend.production.*****.com/polipus I found:

mongos> db.getCollectionNames() [

"data-com-companies",
"data_com_companies",
"googleplus",
"linkedin",
"linkedin-companies",
"linkedin_companies_parsed",
"linkedin_jobs",
"linkedin_jobs_parsed",
"linkedin_pages_errors",
"polipus_q_overflow_data-com-companies_queue_overflow",
"polipus_q_overflow_data_com_companies_queue_overflow",
"polipus_q_overflow_googleplus_queue_overflow",
"polipus_q_overflow_linkedin-companies_queue_overflow",
"polipus_q_overflow_linkedin_jobs_queue_overflow",
"polipus_q_overflow_linkedin_jobs_queue_overflow_old",
"polipus_q_overflow_linkedin_refresh_queue_overflow",
"system.indexes"

]

mongos> db.getCollection(“polipus_q_overflow_linkedin_jobs_queue_overflow”).find().limit(1)

"_id" : ObjectId("54506b98e3d55b20c40b32d3"),
"payload" : "{\"url\":\"https://www.linkedin.com/job/product-designer-jobs/?page_num=7&trk=jserp_pagination_next\",\"depth\":6,\"referer\":\"https://www.linkedin.com/job/product-designer-jobs/?page_num=6&trk=jserp_pagination_6\",\"fetched\":false"

}

mongos> db.polipus_q_overflow_linkedin_refresh_queue_overflow.find().limit(10) {

"_id" : ObjectId("544072b6e3d55b0db7000001"),
"payload" : "{\"url\":\"http://www.linkedin.com/in/*****\",\"depth\":0,\"fetched\":false}"

}

We also assume this MonkeyPatch: Polipus::QueueOverflow.cassandra_queue(namespace, options = {}) that returns instances of this class.



62
63
64
# File 'lib/polipus-cassandra/queue_overflow/cassandra_queue.rb', line 62

def table
  @table
end

Instance Method Details

#clearObject

Clear is a fancy name for a DROP TABLE IF EXISTS <table_>.



100
101
102
103
104
# File 'lib/polipus-cassandra/queue_overflow/cassandra_queue.rb', line 100

def clear
  table_ = [keyspace, table].compact.join '.'
  statement = "DROP TABLE IF EXISTS #{table_} ;"
  session.execute(statement)
end

#empty?Boolean

Return true if the table has no rows. This is achieved with a ‘SELECT WITH LIMIT 1’ query.

Returns:

  • (Boolean)


95
96
97
# File 'lib/polipus-cassandra/queue_overflow/cassandra_queue.rb', line 95

def empty?
  return get.first.nil?
end

#keyspace!(replication = nil, durable_writes = true) ⇒ Object



218
219
220
221
222
# File 'lib/polipus-cassandra/queue_overflow/cassandra_queue.rb', line 218

def keyspace!(replication = nil, durable_writes = true)
  replication ||= "{'class': 'SimpleStrategy', 'replication_factor': '3'}"
  statement = "CREATE KEYSPACE IF NOT EXISTS #{keyspace} WITH replication = #{replication} AND durable_writes = #{durable_writes};"
  cluster.connect.execute(statement)
end

#lengthObject Also known as: size

Length aka Size aka Count is supported in Cassandra… like your POSQL you can COUNT.

SELECT COUNT (*) FROM keyspace.table_name;

TBH I’m not sure if being “defensive” and returning 0/nil in case the results is_empty? … I’m leaving (now) the code simple and noisy if something went wrong in the COUNT.



86
87
88
89
90
91
# File 'lib/polipus-cassandra/queue_overflow/cassandra_queue.rb', line 86

def length
  table_ = [keyspace, table].compact.join '.'
  statement = "SELECT COUNT (*) FROM #{table_} ;"
  result = session.execute(statement)
  result.first['count']
end

#pop(n = 1) ⇒ Object Also known as: dec, shift

Pop removes ‘n’ entries from the overflow table (treated as a queue) and returns a paged result. results.class #=> Cassandra::Results::Paged

Polipus is expecting a String, that will be JSONparsed with the purpose to build a



151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
# File 'lib/polipus-cassandra/queue_overflow/cassandra_queue.rb', line 151

def pop(n = 1)
  # A recap: pop should remove oldest N messages and return to the caller.
  #
  # Let's see how this queue is implemented.
  # In redis, messages are LPUSH-ed:
  #
  #  4 - 3 - 2 - 1 --> REDIS
  #      4 - 3 - 2 --> REDIS
  #          4 - 3 --> REDIS
  #              4 --> REDIS
  #
  # Then, in the fast_dequeue, are RPOP-ped:
  #
  # REDIS --> 1
  # REDIS --> 2 - 1
  # REDIS --> 3 - 2 - 1
  # REDIS --> 4 - 3 - 2 - 1
  #
  # Then, are received in this order:
  # [1] -> TimeUUID(1) = ...
  # [2] -> TimeUUID(1) = ...
  # [3] -> TimeUUID(1) = ...
  # [4] -> TimeUUID(1) = ...
  #
  # As you can see below, are ORDER BY (created_at ASC)... that means
  # "olders first". When using 'LIMIT n' in a query, you get the 'n'
  # olders entries.
  #
  # cqlsh> SELECT  * FROM  polipus_queue_overflow_linkedin.linkedin_overflow ;
  #
  #  queue_name                      | created_at                           | payload
  # ---------------------------------+--------------------------------------+---------
  #  polipus_queue_overflow_linkedin | 4632d49c-1c04-11e5-844b-0b314c777502 |     "1"
  #  polipus_queue_overflow_linkedin | 46339f8a-1c04-11e5-844b-0b314c777502 |     "2"
  #  polipus_queue_overflow_linkedin | 46349962-1c04-11e5-844b-0b314c777502 |     "3"
  #  polipus_queue_overflow_linkedin | 46351860-1c04-11e5-844b-0b314c777502 |     "4"
  #
  # (4 rows)
  # cqlsh> SELECT  * FROM  polipus_queue_overflow_linkedin.linkedin_overflow LIMIT 1;
  #
  #  queue_name                      | created_at                           | payload
  # ---------------------------------+--------------------------------------+---------
  #  polipus_queue_overflow_linkedin | 4632d49c-1c04-11e5-844b-0b314c777502 |     "1"
  #
  # (1 rows)
  #
  table_ = [keyspace, table].compact.join '.'
  results = get(n)
  results.each do |entry|
    statement = "DELETE FROM #{table_} WHERE queue_name = '#{entry['queue_name']}' AND created_at = #{entry['created_at']} ;"
    session.execute(statement)
  end

  # Let's rispect the API as expected by Polipus.
  # Otherwise the execute returns a Cassandra::Results::Paged
  if !results.nil? && results.respond_to?(:count) && results.count == 1
    return results.first['payload']
  end
  return results
end

#push(data) ⇒ Object Also known as: enc, <<

push is your the “write into Cassandra” method.



107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
# File 'lib/polipus-cassandra/queue_overflow/cassandra_queue.rb', line 107

def push(data)
  return nil if data.nil?
  obj = MultiJson.decode(data)

  table_ = [keyspace, table].compact.join('.')
  queue_name = @keyspace
  created_at = @timeuuid_generator.now

  begin
    @semaphore.synchronize do

      if obj.has_key?('payload') && !obj['payload'].empty?
        payload = MultiJson.encode(obj['payload'])
      else
        payload = nil
      end

      column_names = %w[ queue_name created_at payload ]
      values_placeholders = column_names.map{|_| '?'}.join(',')
      statement = "INSERT INTO #{table_} ( #{column_names.join(',')} ) VALUES (#{values_placeholders});"

      session.execute(
        session.prepare(statement),
        arguments: [
          queue_name,
          created_at,
          payload
        ])
    end
  rescue Encoding::UndefinedConversionError
    puts $!.error_char.dump
    puts $!.error_char.encoding
  end

  @logger.debug { "Writing this entry [#{[queue_name, created_at].to_s}]" }
  [queue_name, created_at].to_s
end

#sessionObject



224
225
226
# File 'lib/polipus-cassandra/queue_overflow/cassandra_queue.rb', line 224

def session
  @session ||= @cluster.connect(keyspace)
end

#table!(properties = nil) ⇒ Object

Taking a look in the Cassandra KEYSPACE you will found:

cqlsh> DESCRIBE KEYSPACE polipus_queue_overflow_linkedin ;

CREATE KEYSPACE polipus_queue_overflow_linkedin WITH replication = ‘SimpleStrategy’, ‘replication_factor’: ‘3’ AND durable_writes = true;

CREATE TABLE polipus_queue_overflow_linkedin.linkedin_overflow (

queue_name text,
created_at timeuuid,
payload text,
PRIMARY KEY (queue_name, created_at)

) WITH CLUSTERING ORDER BY (created_at ASC)

AND bloom_filter_fp_chance = 0.01
AND caching = '{"keys":"ALL", "rows_per_partition":"NONE"}'
AND comment = ''
AND compaction = {'min_threshold': '4', 'class': 'org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy', 'max_threshold': '32'}
AND compression = {'sstable_compression': 'org.apache.cassandra.io.compress.LZ4Compressor'}
AND dclocal_read_repair_chance = 0.1
AND default_time_to_live = 0
AND gc_grace_seconds = 864000
AND max_index_interval = 2048
AND memtable_flush_period_in_ms = 0
AND min_index_interval = 128
AND read_repair_chance = 0.0
AND speculative_retry = '99.0PERCENTILE';

This means that:

  • queue_name is partition key;

  • created_at is clustering key;

With sample data:

cqlsh> SELECT * FROM polipus_queue_overflow_linkedin.linkedin_overflow LIMIT 1 ;

queue_name                      | created_at                           | payload

———————————--------------------------------------———————————————————————————+

polipus_queue_overflow_linkedin | de17ece6-1e5e-11e5-b997-47a87c40c422 | "{\"url\":\"http://www.linkedin.com/in/foobar\",\"depth\":0,\"fetched\":false}"

(1 rows) cqlsh>



269
270
271
272
273
274
275
276
277
278
279
280
281
# File 'lib/polipus-cassandra/queue_overflow/cassandra_queue.rb', line 269

def table!(properties = nil)
  table_ = [keyspace, table].compact.join '.'
  def_ = "CREATE TABLE IF NOT EXISTS #{table_}
    (
      queue_name TEXT,
      created_at TIMEUUID,
      payload TEXT,
      PRIMARY KEY (queue_name, created_at)
    )"
  props = Array(properties).join(' AND ')
  statement = props.empty? ? "#{def_};" : "#{def_} WITH #{props};"
  session.execute(statement)
end