Module: EzNemo::StorageAdapter

Included in:
DataStore
Defined in:
lib/eznemo/mysql.rb

Overview

Defines DataStore class for MySQL

Constant Summary collapse

DEFAULT_QUEUE_SIZE =

Number of records it queues up before writing

20

Instance Method Summary collapse

Instance Method Details

#check_ids_with_tags(tags = nil) ⇒ Array

Returns check_id mathing all tags; nil when no tags supplied.

Parameters:

  • tags (Array<String, ...>) (defaults to: nil)

    list of tag text

Returns:

  • (Array)

    check_id mathing all tags; nil when no tags supplied



51
52
53
54
55
56
57
58
# File 'lib/eznemo/mysql.rb', line 51

def check_ids_with_tags(tags = nil)
  return nil if tags.nil? || tags.empty?
  candi_ids = []
  tags.each { |t| candi_ids << Tag.where(text: t).map(:check_id) }
  final_ids = candi_ids[0]
  candi_ids.each { |ids| final_ids = final_ids & ids }
  final_ids
end

#checksArray<EzNemo::Check, ...>

Returns all active checks

Returns:



36
37
38
# File 'lib/eznemo/mysql.rb', line 36

def checks
  checks_with_tags(EzNemo.config[:checks][:tags])
end

#checks_with_tags(tags = nil) ⇒ Array<EzNemo::Checks, ...>

Returns all active checks matching all tags

Parameters:

  • tags (Array<String, ...>) (defaults to: nil)

    list of tag text

Returns:

  • (Array<EzNemo::Checks, ...>)


43
44
45
46
47
# File 'lib/eznemo/mysql.rb', line 43

def checks_with_tags(tags = nil)
  cids = check_ids_with_tags(tags)
  return Check.where(state: true).all if cids.nil?
  Check.where(state: true, id: cids).all
end

#emdatabaseMysql2::EM::Client

Creates and returns new instance of Mysql2::EM::Client

Returns:

  • (Mysql2::EM::Client)


30
31
32
# File 'lib/eznemo/mysql.rb', line 30

def emdatabase
  Mysql2::EM::Client.new(@opts)
end

#flushObject

Flush queue to storage



112
113
114
115
116
117
118
119
120
121
122
123
124
# File 'lib/eznemo/mysql.rb', line 112

def flush
  logger = EzNemo.logger
  # Won't run after trap; run in another thread
  thr = Thread.new do
    logger.debug 'Spawned flushing thread.'
    if write_results(true)
      logger.info "Flushed."
    else
      logger.info "Nothing to flush."
    end
  end
  thr.join
end

#initializeObject



19
20
21
22
23
24
25
26
# File 'lib/eznemo/mysql.rb', line 19

def initialize
  @results = []
  @queue_size = EzNemo.config[:datastore][:queue_size]
  @queue_size ||= DEFAULT_QUEUE_SIZE
  @queue_interval = EzNemo.config[:datastore][:queue_interval]
  @opts = EzNemo.config[:datastore][:options]
  @opts[:flags] = Mysql2::Client::MULTI_STATEMENTS
end

#start_loopObject

Register EventMachine blocks



61
62
63
64
65
66
67
68
69
# File 'lib/eznemo/mysql.rb', line 61

def start_loop
  return unless @queue_interval
  logger = EzNemo.logger
  logger.info 'Registering MySQL EM block...'
  EM.add_periodic_timer(@queue_interval) do
    EzNemo.logger.debug 'Queue interval time arrived.'
    write_results
  end
end

#store_result(result) ⇒ Object

Stores a result; into queue first

Parameters:



73
74
75
76
77
78
79
80
# File 'lib/eznemo/mysql.rb', line 73

def store_result(result)
  @results << result
  return if @queue_interval
  if @results.count >= @queue_size
    EzNemo.logger.debug 'Queue is full.'
    write_results
  end
end

#write_results(sync = false) ⇒ Object

Write the results to storage from queue

Parameters:

  • sync (Boolean) (defaults to: false)

    use EM (async) if false

Returns:

  • (Object)

    Mysql2 client instance



85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
# File 'lib/eznemo/mysql.rb', line 85

def write_results(sync = false)
  logger = EzNemo.logger
  return nil if @results.empty?
  if sync
    logger.debug 'Writing to DB...'
    Result.db.transaction do
      @results.each { |r| r.save}
    end
    return true
  else
    db = emdatabase
    stmt = ''
    @results.each { |r| stmt << Result.dataset.insert_sql(r) + ';' }
    defer = db.query(stmt)
    defer.callback do
      logger.debug 'Wrote to DB async.'
    end
    defer.errback do |r|
      logger.error r.message
      db.close if db.ping
    end
  end
  @results.clear
  db
end