Module: Timescaledb::MigrationHelpers

Defined in:
lib/timescaledb/migration_helpers.rb

Overview

Migration helpers can help you to setup hypertables by default.

Instance Method Summary collapse

Instance Method Details

#add_compression_policy(table_name, orderby:, segmentby:, compress_after: nil, compression_chunk_time_interval: nil) ⇒ Object

Enable compression policy.

Parameters:

  • table_name (String)

    The name of the table.

  • orderby (String)

    The column to order by.

  • segmentby (String)

    The column to segment by.

  • compress_after (String) (defaults to: nil)

    The interval to compress after.

  • compression_chunk_time_interval (String) (defaults to: nil)

    In case to merge chunks.

See Also:



164
165
166
167
168
169
170
171
172
173
174
175
176
# File 'lib/timescaledb/migration_helpers.rb', line 164

def add_compression_policy(table_name, orderby:, segmentby:, compress_after: nil, compression_chunk_time_interval: nil)
  options = []
  options << 'timescaledb.compress'
  options << "timescaledb.compress_orderby = '#{orderby}'" if orderby
  options << "timescaledb.compress_segmentby = '#{segmentby}'" if segmentby
  options << "timescaledb.compression_chunk_time_interval = INTERVAL '#{compression_chunk_time_interval}'" if compression_chunk_time_interval
  execute <<~SQL
    ALTER TABLE #{table_name} SET (
      #{options.join(',')}
    )
  SQL
  execute "SELECT add_compression_policy('#{table_name}', compress_after => INTERVAL '#{compress_after}')" if compress_after
end

#create_continuous_aggregate(table_name, query, **options) ⇒ Object Also known as: create_continuous_aggregates

Create a new continuous aggregate

Examples:

create_continuous_aggregate(:activity_counts, query: <<-SQL, refresh_policies: { schedule_interval: "INTERVAL '1 hour'" })
  SELECT
    time_bucket(INTERVAL '1 day', activity.created_at) AS bucket,
    count(*)
  FROM activity
  GROUP BY bucket
SQL

Parameters:

  • name (String, Symbol)

    The name of the continuous aggregate.

  • query (String)

    The SQL query for the aggregate view definition.

  • with_data (Boolean)

    Set to true to create the aggregate WITH DATA

  • refresh_policies (Hash)

    Set to create a refresh policy

  • materialized_only (Hash)

    a customizable set of options

  • create_group_indexes (Hash)

    a customizable set of options

  • finalized (Hash)

    a customizable set of options

See Also:



101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
# File 'lib/timescaledb/migration_helpers.rb', line 101

def create_continuous_aggregate(table_name, query, **options)
  execute <<~SQL
    CREATE MATERIALIZED VIEW #{table_name}
    WITH (
      timescaledb.continuous
      #{build_with_clause_option_string(:materialized_only, options)}
      #{build_with_clause_option_string(:create_group_indexes, options)}
      #{build_with_clause_option_string(:finalized, options)}
    ) AS
    #{query.respond_to?(:to_sql) ? query.to_sql : query}
    WITH #{'NO' unless options[:with_data]} DATA;
  SQL

  create_continuous_aggregate_policy(table_name, **(options[:refresh_policies] || {}))
end

#create_continuous_aggregate_policy(table_name, **options) ⇒ Object



128
129
130
131
132
133
134
135
136
137
138
# File 'lib/timescaledb/migration_helpers.rb', line 128

def create_continuous_aggregate_policy(table_name, **options)
  return if options.empty?

  # TODO: assert valid keys
  execute <<~SQL
    SELECT add_continuous_aggregate_policy('#{table_name}',
      start_offset => #{options[:start_offset]},
      end_offset => #{options[:end_offset]},
      schedule_interval => #{options[:schedule_interval]});
  SQL
end

#create_hypertable(table_name, time_column: 'created_at', chunk_time_interval: '1 week', compress_segmentby: nil, compress_orderby: 'created_at', compress_after: nil, drop_after: nil, partition_column: nil, number_partitions: nil, **hypertable_options) ⇒ Object

Setup hypertable from options



39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
# File 'lib/timescaledb/migration_helpers.rb', line 39

def create_hypertable(table_name,
                      time_column: 'created_at',
                      chunk_time_interval: '1 week',
                      compress_segmentby: nil,
                      compress_orderby: 'created_at',
                      compress_after: nil,
                      drop_after: nil,
                      partition_column: nil,
                      number_partitions: nil,
                      **hypertable_options)

  original_logger = ActiveRecord::Base.logger
  ActiveRecord::Base.logger = Logger.new(STDOUT)

  dimension = "by_range(#{quote(time_column)}, #{parse_interval(chunk_time_interval)})"

  arguments = [ quote(table_name), dimension,
    *hypertable_options.map { |k, v| "#{k} => #{quote(v)}" }
  ]

  execute "SELECT create_hypertable(#{arguments.compact.join(', ')})"

  if partition_column && number_partitions
    execute "SELECT add_dimension('#{table_name}', by_hash(#{quote(partition_column)}, #{number_partitions}))"
  end

  if compress_segmentby || compress_after
    add_compression_policy(table_name, orderby: compress_orderby, segmentby: compress_segmentby, compress_after: compress_after)
  end

  if drop_after
    add_retention_policy(table_name, drop_after: drop_after)
  end
ensure
  ActiveRecord::Base.logger = original_logger if original_logger
end

#create_retention_policy(table_name, drop_after:) ⇒ Object Also known as: add_retention_policy



144
145
146
# File 'lib/timescaledb/migration_helpers.rb', line 144

def create_retention_policy(table_name, drop_after:)
  execute "SELECT add_retention_policy('#{table_name}', drop_after => #{parse_interval(drop_after)})"
end

#create_table(table_name, id: :primary_key, primary_key: nil, force: nil, **options) ⇒ Object

‘create_table` accepts a `hypertable` argument with options for creating a TimescaleDB hypertable.

See docs.timescale.com/api/latest/hypertable/create_hypertable/#optional-arguments for additional options supported by the plugin.

Examples:

options = {
  time_column: 'created_at',
  chunk_time_interval: '1 min',
  compress_segmentby: 'identifier',
  compress_orderby: 'created_at',
  compress_after: '7 days'
}

create_table(:events, id: false, hypertable: options) do |t|
  t.string :identifier, null: false
  t.jsonb :payload
  t.timestamps
end


27
28
29
30
# File 'lib/timescaledb/migration_helpers.rb', line 27

def create_table(table_name, id: :primary_key, primary_key: nil, force: nil, **options)
  super
  create_hypertable(table_name, **options[:hypertable]) if options.key?(:hypertable)
end

#drop_continuous_aggregates(view_name) ⇒ Object

Drop a new continuous aggregate.

It basically DROP MATERIALIZED VIEW for a given @name.

Parameters:

  • name (String, Symbol)

    The name of the continuous aggregate view.



122
123
124
# File 'lib/timescaledb/migration_helpers.rb', line 122

def drop_continuous_aggregates view_name
  execute "DROP MATERIALIZED VIEW #{view_name}"
end

#remove_continuous_aggregate_policy(table_name) ⇒ Object



140
141
142
# File 'lib/timescaledb/migration_helpers.rb', line 140

def remove_continuous_aggregate_policy(table_name)
  execute "SELECT remove_continuous_aggregate_policy('#{table_name}')"
end

#remove_retention_policy(table_name) ⇒ Object



150
151
152
# File 'lib/timescaledb/migration_helpers.rb', line 150

def remove_retention_policy(table_name)
  execute "SELECT remove_retention_policy('#{table_name}')"
end

#valid_table_definition_optionsObject

Override the valid_table_definition_options to include hypertable.



33
34
35
# File 'lib/timescaledb/migration_helpers.rb', line 33

def valid_table_definition_options # :nodoc:
  super + [:hypertable]
end