Class: LenzBase
- Inherits:
-
Object
- Object
- LenzBase
- Defined in:
- lib/lenz_base.rb
Instance Method Summary collapse
- #clear ⇒ Object
- #createCqlClient(db_info) ⇒ Object
- #createSqlClient(db_info) ⇒ Object
- #getCqlClient(db_info) ⇒ Object
- #getMQChannel(mq_info) ⇒ Object
- #getSqlClient(db_info) ⇒ Object
-
#initialize(show_query = false) ⇒ LenzBase
constructor
A new instance of LenzBase.
- #initLogger ⇒ Object
- #log(msg) ⇒ Object
- #makeProcessResult(user_id, mq_info, data = nil, flag = '') ⇒ Object
- #openMQChannel(mq_info) ⇒ Object
- #processMQMessage(mq_info, delivery_info, properties, body) ⇒ Object
- #publishToMQ(mq_info, message, args = {}) ⇒ Object
- #queryCql(db_info, query) ⇒ Object
- #querySql(db_info, query) ⇒ Object
- #removeCqlClient(db_info) ⇒ Object
- #removeSqlClient(db_info) ⇒ Object
- #sleep(duration) ⇒ Object
- #subscribeMQ(mq_info) ⇒ Object
- #toJson(val) ⇒ Object
Constructor Details
#initialize(show_query = false) ⇒ LenzBase
Returns a new instance of LenzBase.
10 11 12 13 14 15 16 17 |
# File 'lib/lenz_base.rb', line 10 def initialize(show_query = false) initLogger @sql_clients = {} @cql_clients = {} @mq_sessions = {} @mq_channels = {} @show_query = show_query end |
Instance Method Details
#clear ⇒ Object
189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 |
# File 'lib/lenz_base.rb', line 189 def clear @mq_channels.each do |mq_info, channel| log("close channel in #{mq_info}") channel.close end @mq_sessions.each do |mq_info, session| log("close session on #{mq_info}") session.close end @sql_clients.each do |db_info, client| log("close connection on #{db_info}") client.close end @cql_clients.each do |db_info, client| log("close connection on #{db_info}") client.close end end |
#createCqlClient(db_info) ⇒ Object
87 88 89 90 91 92 93 94 95 96 97 |
# File 'lib/lenz_base.rb', line 87 def createCqlClient(db_info) begin client = Cassandra.cluster(db_info).connect(db_info[:keyspace]) log("Cassandra connected #{db_info}") return client rescue Exception => e log e sleep(5) retry end end |
#createSqlClient(db_info) ⇒ Object
47 48 49 50 51 52 53 54 55 56 57 |
# File 'lib/lenz_base.rb', line 47 def createSqlClient(db_info) begin client = Mysql2::Client.new(db_info) log("MySql connected #{db_info}") return client rescue Exception => e log e sleep(5) retry end end |
#getCqlClient(db_info) ⇒ Object
73 74 75 76 77 78 |
# File 'lib/lenz_base.rb', line 73 def getCqlClient(db_info) if(!@cql_clients[db_info]) @cql_clients[db_info] = createCqlClient(db_info) end return @cql_clients[db_info] end |
#getMQChannel(mq_info) ⇒ Object
113 114 115 116 117 118 |
# File 'lib/lenz_base.rb', line 113 def getMQChannel(mq_info) if(!@mq_channels[mq_info] || !@mq_channels[mq_info].active || !@mq_sessions[mq_info].connected?) @mq_channels[mq_info] = openMQChannel(mq_info) end return @mq_channels[mq_info] end |
#getSqlClient(db_info) ⇒ Object
33 34 35 36 37 38 |
# File 'lib/lenz_base.rb', line 33 def getSqlClient(db_info) if(!@sql_clients[db_info]) @sql_clients[db_info] = createSqlClient(db_info) end return @sql_clients[db_info] end |
#initLogger ⇒ Object
19 20 21 22 |
# File 'lib/lenz_base.rb', line 19 def initLogger @logger = Logger.new(STDOUT) @logger.datetime_format="%Y-%m-%d %H:%M:%S" end |
#log(msg) ⇒ Object
24 25 26 |
# File 'lib/lenz_base.rb', line 24 def log(msg) @logger.info(msg) end |
#makeProcessResult(user_id, mq_info, data = nil, flag = '') ⇒ Object
153 154 155 |
# File 'lib/lenz_base.rb', line 153 def makeProcessResult(user_id, mq_info, data = nil, flag = '') return {:user_id=>user_id, :queue=>mq_info[:name], :data=>data, :flag=>flag} end |
#openMQChannel(mq_info) ⇒ Object
120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 |
# File 'lib/lenz_base.rb', line 120 def openMQChannel(mq_info) begin if(!@mq_sessions[mq_info] || !@mq_sessions[mq_info].connected?) @mq_sessions[mq_info] = Bunny.new(mq_info) @mq_sessions[mq_info].start log("Message Queue connected #{mq_info}") end if(@mq_sessions[mq_info].respond_to? :default_channel) return @mq_sessions[mq_info].default_channel else return @mq_sessions[mq_info].create_channel end rescue Exception => e log e sleep(5) retry end end |
#processMQMessage(mq_info, delivery_info, properties, body) ⇒ Object
186 187 |
# File 'lib/lenz_base.rb', line 186 def processMQMessage(mq_info, delivery_info, properties, body) end |
#publishToMQ(mq_info, message, args = {}) ⇒ Object
140 141 142 143 144 145 146 147 148 149 150 151 |
# File 'lib/lenz_base.rb', line 140 def publishToMQ(mq_info, , args = {}) begin ch = getMQChannel(mq_info) q = ch.queue(mq_info[:name], mq_info[:options] == nil ? {} : mq_info[:options]) q.publish(toJson(), args) log("message published to #{mq_info}") rescue Exception => e log e sleep(5) retry end end |
#queryCql(db_info, query) ⇒ Object
99 100 101 102 103 104 105 106 107 108 109 110 111 |
# File 'lib/lenz_base.rb', line 99 def queryCql(db_info, query) begin result = getCqlClient(db_info).execute(query) log("CQL query launched for #{db_info}") log(query) if @show_query return result rescue Exception => e log e removeCqlClient(db_info) sleep(5) retry end end |
#querySql(db_info, query) ⇒ Object
59 60 61 62 63 64 65 66 67 68 69 70 71 |
# File 'lib/lenz_base.rb', line 59 def querySql(db_info, query) begin result = getSqlClient(db_info).query(query) log("SQL query launched for #{db_info}") log(query) if @show_query return result ? result.entries : nil rescue Exception => e log e removeSqlClient(db_info) sleep(5) retry end end |
#removeCqlClient(db_info) ⇒ Object
80 81 82 83 84 85 |
# File 'lib/lenz_base.rb', line 80 def removeCqlClient(db_info) if(@cql_clients[db_info]) @cql_clients[db_info].close end @cql_clients[db_info] = nil end |
#removeSqlClient(db_info) ⇒ Object
40 41 42 43 44 45 |
# File 'lib/lenz_base.rb', line 40 def removeSqlClient(db_info) if(@sql_clients[db_info]) @sql_clients[db_info].close end @sql_clients[db_info] = nil end |
#sleep(duration) ⇒ Object
28 29 30 31 |
# File 'lib/lenz_base.rb', line 28 def sleep(duration) log("sleep #{duration}s") Kernel.sleep(duration) end |
#subscribeMQ(mq_info) ⇒ Object
157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 |
# File 'lib/lenz_base.rb', line 157 def subscribeMQ(mq_info) log("start subscribe #{mq_info}") begin ch = getMQChannel(mq_info) q = ch.queue(mq_info[:name], mq_info[:options]) q.subscribe(:block => true) do |delivery_info, properties, body| begin log(delivery_info) processMQMessage(mq_info, delivery_info, properties, body) log('message consumed') rescue Exception => e log(e) end end rescue Exception=>e log(e) sleep(5) retry end end |
#toJson(val) ⇒ Object
178 179 180 181 182 183 184 |
# File 'lib/lenz_base.rb', line 178 def toJson(val) if(val.respond_to? :to_json) return val.to_json else return val.to_s end end |