Class: Fluent::CassandraUpsertor
- Inherits:
-
BufferedOutput
- Object
- BufferedOutput
- Fluent::CassandraUpsertor
- Includes:
- CassandraConnection
- Defined in:
- lib/fluent/plugin/out_cassandra_upsert.rb
Instance Method Summary collapse
-
#configure(conf) ⇒ Object
shutdown.
-
#format(tag, time, record) ⇒ Object
configure.
-
#shutdown ⇒ Object
start.
- #start ⇒ Object
- #write(chunk) ⇒ Object
Methods included from CassandraConnection
Instance Method Details
#configure(conf) ⇒ Object
shutdown
35 36 37 38 39 40 41 42 43 44 |
# File 'lib/fluent/plugin/out_cassandra_upsert.rb', line 35 def configure(conf) super # perform validations raise ConfigError, "params 'where_condition_upd' is require condition or primarykey for case update" if self.where_condition_upd.nil? @caseInsertValue = self.case_insert_value @caseUpdateValue = self.case_update_value @whereCondUpd = self.where_condition_upd end |
#format(tag, time, record) ⇒ Object
configure
46 47 48 |
# File 'lib/fluent/plugin/out_cassandra_upsert.rb', line 46 def format(tag, time, record) record.to_msgpack end |
#shutdown ⇒ Object
start
30 31 32 33 |
# File 'lib/fluent/plugin/out_cassandra_upsert.rb', line 30 def shutdown super @session.close if @session end |
#start ⇒ Object
25 26 27 28 |
# File 'lib/fluent/plugin/out_cassandra_upsert.rb', line 25 def start super @session ||= get_session(self.host, self.port, self.keyspace, self.connect_timeout, self.username, self.password) end |
#write(chunk) ⇒ Object
50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 |
# File 'lib/fluent/plugin/out_cassandra_upsert.rb', line 50 def write(chunk) chunk.msgpack_each { |record| whereCondition = prepareParameter(@whereCondUpd, record) cql = "select count(*) from #{self.keyspace}.#{self.tablename}" cql += " where " + whereCondition + ";" countRow = nil begin countRow = @session.execute(cql) rescue Exception => e $log.error "Cannot select Cassandra: #{e.message}\nTrace: #{e.backtrace.to_s}" raise e end countRow = getRowCount(countRow) if countRow > 0 @caseUpdateValue = prepareParameter(@caseUpdateValue, record) updateCassandra(@caseUpdateValue, whereCondition) else @caseInsertValue = prepareParameter(@caseInsertValue, record) insertCassandra(@caseInsertValue) end } end |