Class: Polipus::Storage::CassandraStore

Inherits:
Base
  • Object
show all
Defined in:
lib/polipus-cassandra/storage/cassandra_store.rb

Constant Summary collapse

BINARY_FIELDS =
%w(body headers user_data)

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(options = {}) ⇒ CassandraStore

Returns a new instance of CassandraStore.



41
42
43
44
45
46
47
# File 'lib/polipus-cassandra/storage/cassandra_store.rb', line 41

def initialize(options = {})
  @cluster = options[:cluster]
  @keyspace = options[:keyspace]
  @table = options[:table]
  @except = options[:except] || []
  @semaphore = Mutex.new
end

Instance Attribute Details

#clusterObject

CassandraStore wants to persists documents (please ignore the jargon inherited from MongoDB) like the following JSON-ish entry:

> db.find({})

 {
   "_id" : ObjectId("...."),
   "url" : "https://www.awesome.org/meh",
   "code" : 200,
   "depth" : 0,
   "referer" : "",
   "redirect_to" : "",
   "response_time" : 1313,
   "fetched" : true,
   "user_data" :
     {
       "imported" : false,
       "is_developer" : false,
       "last_modified" : null
     },
    "fetched_at" : 1434977757,
    "error" : "",
    "uuid" : "4ddce293532ea2454356a4210e61c363"
}


37
38
39
# File 'lib/polipus-cassandra/storage/cassandra_store.rb', line 37

def cluster
  @cluster
end

#keyspaceObject

CassandraStore wants to persists documents (please ignore the jargon inherited from MongoDB) like the following JSON-ish entry:

> db.find({})

 {
   "_id" : ObjectId("...."),
   "url" : "https://www.awesome.org/meh",
   "code" : 200,
   "depth" : 0,
   "referer" : "",
   "redirect_to" : "",
   "response_time" : 1313,
   "fetched" : true,
   "user_data" :
     {
       "imported" : false,
       "is_developer" : false,
       "last_modified" : null
     },
    "fetched_at" : 1434977757,
    "error" : "",
    "uuid" : "4ddce293532ea2454356a4210e61c363"
}


37
38
39
# File 'lib/polipus-cassandra/storage/cassandra_store.rb', line 37

def keyspace
  @keyspace
end

#tableObject

CassandraStore wants to persists documents (please ignore the jargon inherited from MongoDB) like the following JSON-ish entry:

> db.find({})

 {
   "_id" : ObjectId("...."),
   "url" : "https://www.awesome.org/meh",
   "code" : 200,
   "depth" : 0,
   "referer" : "",
   "redirect_to" : "",
   "response_time" : 1313,
   "fetched" : true,
   "user_data" :
     {
       "imported" : false,
       "is_developer" : false,
       "last_modified" : null
     },
    "fetched_at" : 1434977757,
    "error" : "",
    "uuid" : "4ddce293532ea2454356a4210e61c363"
}


37
38
39
# File 'lib/polipus-cassandra/storage/cassandra_store.rb', line 37

def table
  @table
end

Instance Method Details

#add(page) ⇒ Object

'url'           => @url.to_s,
'headers'       => Marshal.dump(@headers),
'body'          => @body,
'links'         => links.map(&:to_s),
'code'          => @code,
'depth'         => @depth,
'referer'       => @referer.to_s,
'redirect_to'   => @redirect_to.to_s,
'response_time' => @response_time,
'fetched'       => @fetched,
'user_data'     => @user_data.nil? ? { : @user_data.marshal_dump,
'fetched_at'    => @fetched_at,
'error'         => @error.to_s

}



65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
# File 'lib/polipus-cassandra/storage/cassandra_store.rb', line 65

def add(page)
  @semaphore.synchronize do
    table_ = [keyspace, table].compact.join '.'
    uuid_ = uuid(page)
    obj = page.to_hash
    Array(@except).each { |e| obj.delete(e.to_s) }

    begin
      BINARY_FIELDS.each do |field|
        obj[field] = obj[field].to_s.encode('UTF-8', {
          invalid: :replace,
          undef: :replace,
          replace: '?' }) if can_be_converted?(obj[field])
        # ec = Encoding::Converter.new("ASCII-8BIT", "UTF-8")
        # obj[field] = ec.convert(obj[field]) if can_be_converted?(obj[field])
        # obj[field] = obj[field].force_encoding('ASCII-8BIT').force_encoding('UTF-8') if can_be_converted?(obj[field])
      end

      json = MultiJson.encode(obj)

      url = obj.fetch('url', nil)
      code = obj.fetch('code', nil)
      depth = obj.fetch('depth', nil)
      referer = obj.fetch('referer', nil)
      redirectto = obj.fetch('redirect_to', nil)
      response_time = obj.fetch('response_time', nil)
      fetched = obj.fetch('fetched', nil)
      error = obj.fetch('error', nil)
      page = Zlib::Deflate.deflate(json)

      if obj.has_key?('user_data') && !obj['user_data'].empty?
        user_data = MultiJson.encode(obj['user_data'])
      else
        user_data = nil
      end

      value = obj.fetch('fetched_at', nil)
      fetched_at = case value
      when Fixnum
        Time.at(value)
      when String
        Time.parse(value)
      else
        nil
      end

      column_names = %w[ uuid url code depth referer redirect_to response_time fetched user_data fetched_at error page ]
      values_placeholders = column_names.map{|_| '?'}.join(',')
      statement = "INSERT INTO #{table_} ( #{column_names.join(',')} ) VALUES (#{values_placeholders});"

      session.execute(
        session.prepare(statement),
        arguments: [
          uuid_,
          url,
          code,
          depth,
          referer,
          redirectto,
          response_time,
          fetched,
          user_data,
          fetched_at,
          error,
          page
        ])

    rescue Encoding::UndefinedConversionError
      puts $!.error_char.dump
      puts $!.error_char.encoding
    end

    uuid_
  end
end

#clearObject



141
142
143
144
145
# File 'lib/polipus-cassandra/storage/cassandra_store.rb', line 141

def clear
  table_ = [keyspace, table].compact.join '.'
  statement = "DROP TABLE #{table_};"
  session.execute statement
end

#countObject

TBH I’m not sure if being “defensive” and returning 0/nil in case the results is_empty? … I’m leaving (now) the code simple and noisy if something went wrong in the COUNT.



150
151
152
153
154
155
# File 'lib/polipus-cassandra/storage/cassandra_store.rb', line 150

def count
  table_ = [keyspace, table].compact.join '.'
  statement = "SELECT COUNT (*) FROM #{table_} ;"
  result = session.execute(statement)
  result.first['count']
end

#eachObject



157
158
159
160
161
162
163
164
# File 'lib/polipus-cassandra/storage/cassandra_store.rb', line 157

def each
  table_ = [keyspace, table].compact.join '.'
  statement = "SELECT * FROM #{table_};"
  session.execute(statement).each do |data|
    page = load_page(data) unless data.nil?
    yield data['uuid'], page
  end
end

#exists?(page) ⇒ Boolean

Returns:

  • (Boolean)


166
167
168
169
170
171
172
173
174
# File 'lib/polipus-cassandra/storage/cassandra_store.rb', line 166

def exists?(page)
  @semaphore.synchronize do
    table_ = [keyspace, table].compact.join '.'
    statement = "SELECT uuid FROM #{table_} WHERE uuid = ? LIMIT 1;"
    results = session.execute(session.prepare(statement),
                              arguments: [uuid(page)])
    !results.first.nil?
  end
end

#get(page) ⇒ Object



176
177
178
179
180
181
182
183
184
185
# File 'lib/polipus-cassandra/storage/cassandra_store.rb', line 176

def get(page)
  @semaphore.synchronize do
    table_ = [keyspace, table].compact.join '.'
    statement = "SELECT * FROM #{table_} WHERE uuid = ? LIMIT 1;"
    results = session.execute(session.prepare(statement),
                              arguments: [uuid(page)])
    data = results.first
    load_page(data) unless data.nil?
  end
end

#keyspace!(replication = nil, durable_writes = true) ⇒ Object



187
188
189
190
191
# File 'lib/polipus-cassandra/storage/cassandra_store.rb', line 187

def keyspace!(replication = nil, durable_writes = true)
  replication ||= "{'class': 'SimpleStrategy', 'replication_factor': '3'}"
  statement = "CREATE KEYSPACE IF NOT EXISTS #{keyspace} WITH replication = #{replication} AND durable_writes = #{durable_writes};"
  cluster.connect.execute statement
end

#load_page(data) ⇒ Object



229
230
231
232
233
234
235
# File 'lib/polipus-cassandra/storage/cassandra_store.rb', line 229

def load_page(data)
  json = Zlib::Inflate.inflate(data['page'])
  hash = MultiJson.decode(json)
  page = Page.from_hash(hash)
  page.fetched_at = 0 if page.fetched_at.nil?
  page
end

#remove(page) ⇒ Object



193
194
195
196
197
198
199
200
201
# File 'lib/polipus-cassandra/storage/cassandra_store.rb', line 193

def remove(page)
  @semaphore.synchronize do
    table_ = [keyspace, table].compact.join '.'
    statement = "DELETE FROM #{table_} WHERE uuid = ?;"
    session.execute(session.prepare(statement),
                    arguments: [uuid(page)])
    true
  end
end

#sessionObject



203
204
205
# File 'lib/polipus-cassandra/storage/cassandra_store.rb', line 203

def session
  @session ||= @cluster.connect(keyspace)
end

#table!(properties = nil) ⇒ Object



207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
# File 'lib/polipus-cassandra/storage/cassandra_store.rb', line 207

def table!(properties = nil)
  table_ = [keyspace, table].compact.join '.'
  def_ = "CREATE TABLE IF NOT EXISTS #{table_}
    (
      uuid TEXT PRIMARY KEY,
      url TEXT,
      code INT,
      depth INT,
      referer TEXT,
      redirect_to TEXT,
      response_time BIGINT,
      fetched BOOLEAN,
      user_data TEXT,
      fetched_at TIMESTAMP,
      error TEXT,
      page BLOB
    )"
  props = properties.to_a.join(' AND ')
  statement = props.empty? ? "#{def_};" : "#{def_} WITH #{props};"
  session.execute statement
end