Class: MoSQL::Tailer

Inherits:
Mongoriver::AbstractPersistentTailer
  • Object
show all
Defined in:
lib/mosql/tailer.rb

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(backends, type, table, opts) ⇒ Tailer

Returns a new instance of Tailer.



26
27
28
29
30
# File 'lib/mosql/tailer.rb', line 26

def initialize(backends, type, table, opts)
  super(backends, type, opts)
  @table   = table
  @service = opts[:service] || "mosql"
end

Class Method Details

.create_table(db, tablename) ⇒ Object



3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
# File 'lib/mosql/tailer.rb', line 3

def self.create_table(db, tablename)
  if !db.table_exists?(tablename)
    db.create_table(tablename) do
      column :service,     'TEXT'
      column :timestamp,   'INTEGER'
      column :position,    'BYTEA'
      primary_key [:service]
    end
  else
    # Try to do seamless upgrades from before-tokumx times
    # It will raise an exception in this in most cases,
    # but there isn't a nice way I found to check if column
    # exists.
    begin
      db.add_column(tablename, :position, 'BYTEA')
    rescue Sequel::DatabaseError => e
      raise unless MoSQL::SQLAdapter.duplicate_column_error?(e)
    end
  end

  db[tablename.to_sym]
end

Instance Method Details

#read_stateObject



32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
# File 'lib/mosql/tailer.rb', line 32

def read_state
  row = @table.where(:service => @service).first
  return nil unless row
  # Again, try to do seamless upgrades - 
  # If latest operation before or at timestamp if no position 
  # exists, use timestamp in database to guess what it could be.
  result = {}
  result['time'] = Time.at(row.fetch(:timestamp))
  if row[:position]
    result['position'] = from_blob(row[:position])
  else
    log.warn("Trying to seamlessly update from old version!")
    result['position'] = most_recent_position(result['time'])
    save_state(result)
  end
  result
end

#write_state(state) ⇒ Object



50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
# File 'lib/mosql/tailer.rb', line 50

def write_state(state)
  data = {
    :service => @service,
    :timestamp => state['time'].to_i,
    :position => to_blob(state['position'])
  }

  unless @did_insert
    begin
      @table.insert(data)
    rescue Sequel::DatabaseError => e
      raise unless MoSQL::SQLAdapter.duplicate_key_error?(e)
    end
    @did_insert = true
  end

  @table.where(:service => @service).update(data)
end