Class: LenzBase

Inherits:
Object
  • Object
show all
Defined in:
lib/lenz_base.rb

Instance Method Summary collapse

Constructor Details

#initialize(show_query = false) ⇒ LenzBase

Returns a new instance of LenzBase.



10
11
12
13
14
15
16
17
# File 'lib/lenz_base.rb', line 10

def initialize(show_query = false)
  initLogger
  @sql_clients = {}
  @cql_clients = {}
  @mq_sessions = {}
  @mq_channels = {}
  @show_query = show_query
end

Instance Method Details

#clearObject



189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
# File 'lib/lenz_base.rb', line 189

def clear
  @mq_channels.each do |mq_info, channel|
    log("close channel in #{mq_info}")
    channel.close
  end

  @mq_sessions.each do |mq_info, session|
    log("close session on #{mq_info}")
    session.close
  end

  @sql_clients.each do |db_info, client|
    log("close connection on #{db_info}")
    client.close
  end

  @cql_clients.each do |db_info, client|
    log("close connection on #{db_info}")
    client.close
  end
end

#createCqlClient(db_info) ⇒ Object



87
88
89
90
91
92
93
94
95
96
97
# File 'lib/lenz_base.rb', line 87

def createCqlClient(db_info)
  begin
    client = Cassandra.cluster(db_info).connect(db_info[:keyspace])
    log("Cassandra connected #{db_info}")
    return client
  rescue Exception => e
    log e
    sleep(5)
    retry
  end
end

#createSqlClient(db_info) ⇒ Object



47
48
49
50
51
52
53
54
55
56
57
# File 'lib/lenz_base.rb', line 47

def createSqlClient(db_info)
  begin
    client = Mysql2::Client.new(db_info)
    log("MySql connected #{db_info}")
    return client
  rescue Exception => e
    log e
    sleep(5)
    retry
  end
end

#getCqlClient(db_info) ⇒ Object



73
74
75
76
77
78
# File 'lib/lenz_base.rb', line 73

def getCqlClient(db_info)
  if(!@cql_clients[db_info])
    @cql_clients[db_info] = createCqlClient(db_info)
  end
  return @cql_clients[db_info]
end

#getMQChannel(mq_info) ⇒ Object



113
114
115
116
117
118
# File 'lib/lenz_base.rb', line 113

def getMQChannel(mq_info)
  if(!@mq_channels[mq_info] || !@mq_channels[mq_info].active || !@mq_sessions[mq_info].connected?)
    @mq_channels[mq_info] = openMQChannel(mq_info)
  end
  return @mq_channels[mq_info]
end

#getSqlClient(db_info) ⇒ Object



33
34
35
36
37
38
# File 'lib/lenz_base.rb', line 33

def getSqlClient(db_info)
  if(!@sql_clients[db_info])
    @sql_clients[db_info] = createSqlClient(db_info)
  end
  return @sql_clients[db_info]
end

#initLoggerObject



19
20
21
22
# File 'lib/lenz_base.rb', line 19

def initLogger
  @logger = Logger.new(STDOUT)
  @logger.datetime_format="%Y-%m-%d %H:%M:%S"
end

#log(msg) ⇒ Object



24
25
26
# File 'lib/lenz_base.rb', line 24

def log(msg)
  @logger.info(msg)
end

#makeProcessResult(user_id, mq_info, data = nil, flag = '') ⇒ Object



153
154
155
# File 'lib/lenz_base.rb', line 153

def makeProcessResult(user_id, mq_info, data = nil, flag = '')
  return {:user_id=>user_id, :queue=>mq_info[:name], :data=>data, :flag=>flag}
end

#openMQChannel(mq_info) ⇒ Object



120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
# File 'lib/lenz_base.rb', line 120

def openMQChannel(mq_info)
  begin
    if(!@mq_sessions[mq_info] || !@mq_sessions[mq_info].connected?)
      @mq_sessions[mq_info] = Bunny.new(mq_info)
      @mq_sessions[mq_info].start
      log("Message Queue connected #{mq_info}")
    end

    if(@mq_sessions[mq_info].respond_to? :default_channel)
      return @mq_sessions[mq_info].default_channel
    else
      return @mq_sessions[mq_info].create_channel
    end
  rescue Exception => e
    log e
    sleep(5)
    retry
  end
end

#processMQMessage(mq_info, delivery_info, properties, body) ⇒ Object



186
187
# File 'lib/lenz_base.rb', line 186

def processMQMessage(mq_info, delivery_info, properties, body)
end

#publishToMQ(mq_info, message, args = {}) ⇒ Object



140
141
142
143
144
145
146
147
148
149
150
151
# File 'lib/lenz_base.rb', line 140

def publishToMQ(mq_info, message, args = {})
  begin
    ch = getMQChannel(mq_info)
    q  = ch.queue(mq_info[:name], mq_info[:options] == nil ? {} : mq_info[:options])
    q.publish(toJson(message), args)
    log("message published to #{mq_info}")
  rescue Exception => e
    log e
    sleep(5)
    retry
  end
end

#queryCql(db_info, query) ⇒ Object



99
100
101
102
103
104
105
106
107
108
109
110
111
# File 'lib/lenz_base.rb', line 99

def queryCql(db_info, query)
  begin
    result = getCqlClient(db_info).execute(query)
    log("CQL query launched for #{db_info}")
    log(query) if @show_query
    return result
  rescue Exception => e
    log e
    removeCqlClient(db_info)
    sleep(5)
    retry
  end
end

#querySql(db_info, query) ⇒ Object



59
60
61
62
63
64
65
66
67
68
69
70
71
# File 'lib/lenz_base.rb', line 59

def querySql(db_info, query)
  begin
    result = getSqlClient(db_info).query(query)
    log("SQL query launched for #{db_info}")
    log(query) if @show_query
    return result ? result.entries : nil
  rescue Exception => e
    log e
    removeSqlClient(db_info)
    sleep(5)
    retry
  end
end

#removeCqlClient(db_info) ⇒ Object



80
81
82
83
84
85
# File 'lib/lenz_base.rb', line 80

def removeCqlClient(db_info)
  if(@cql_clients[db_info])
    @cql_clients[db_info].close
  end
  @cql_clients[db_info] = nil
end

#removeSqlClient(db_info) ⇒ Object



40
41
42
43
44
45
# File 'lib/lenz_base.rb', line 40

def removeSqlClient(db_info)
  if(@sql_clients[db_info])
    @sql_clients[db_info].close
  end
  @sql_clients[db_info] = nil
end

#sleep(duration) ⇒ Object



28
29
30
31
# File 'lib/lenz_base.rb', line 28

def sleep(duration)
  log("sleep #{duration}s")
  Kernel.sleep(duration)
end

#subscribeMQ(mq_info) ⇒ Object



157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
# File 'lib/lenz_base.rb', line 157

def subscribeMQ(mq_info)
  log("start subscribe #{mq_info}")
  begin
    ch = getMQChannel(mq_info)
    q = ch.queue(mq_info[:name], mq_info[:options])
    q.subscribe(:block => true) do |delivery_info, properties, body|
      begin
        log(delivery_info)
        processMQMessage(mq_info, delivery_info, properties, body)
        log('message consumed')
      rescue Exception => e
        log(e)
      end
    end
  rescue Exception=>e
    log(e)
    sleep(5)
    retry
  end
end

#toJson(val) ⇒ Object



178
179
180
181
182
183
184
# File 'lib/lenz_base.rb', line 178

def toJson(val)
  if(val.respond_to? :to_json)
    return val.to_json
  else
    return val.to_s
  end
end