Skip to content

Commit

Permalink
Update the create_hypertable interface
Browse files Browse the repository at this point in the history
  • Loading branch information
jonatas committed Nov 29, 2024
1 parent 17970b0 commit ec1ac9b
Show file tree
Hide file tree
Showing 10 changed files with 109 additions and 105 deletions.
4 changes: 2 additions & 2 deletions docs/migrations.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ hypertable_options = {
time_column: 'created_at',
chunk_time_interval: '1 min',
compress_segmentby: 'identifier',
compression_interval: '7 days'
compress_after: '7 days'
}

create_table(:events, id: false, hypertable: hypertable_options) do |t|
Expand Down Expand Up @@ -43,7 +43,7 @@ hypertable_options = {
chunk_time_interval: '1 min',
compress_segmentby: 'symbol',
compress_orderby: 'created_at',
compression_interval: '7 days'
compress_after: '7 days'
}
create_table :ticks, hypertable: hypertable_options, id: false do |t|
t.string :symbol
Expand Down
2 changes: 1 addition & 1 deletion docs/toolkit_candlestick.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ db do
chunk_time_interval: "1 day",
compress_segmentby: "symbol",
compress_orderby: "time",
compression_interval: "1 month"
compress_after: "1 month"
}
create_table :ticks, hypertable: hypertable_options, id: false do |t|
t.timestamp :time
Expand Down
2 changes: 1 addition & 1 deletion docs/toolkit_ohlc.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ hypertable_options = {
chunk_time_interval: '1 week',
compress_segmentby: 'symbol',
compress_orderby: 'time',
compression_interval: '1 month'
compress_after: '1 month'
}
create_table :ticks, hypertable: hypertable_options, id: false do |t|
t.timestampt :time
Expand Down
3 changes: 3 additions & 0 deletions examples/ranking/config/initializers/timescale.rb
Original file line number Diff line number Diff line change
@@ -1,2 +1,5 @@
require 'timescaledb'
require 'scenic'

ActiveSupport.on_load(:active_record) { extend Timescaledb::ActsAsHypertable }

70 changes: 43 additions & 27 deletions lib/timescaledb/migration_helpers.rb
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ module MigrationHelpers
# chunk_time_interval: '1 min',
# compress_segmentby: 'identifier',
# compress_orderby: 'created_at',
# compression_interval: '7 days'
# compress_after: '7 days'
# }
#
# create_table(:events, id: false, hypertable: options) do |t|
Expand All @@ -38,41 +38,31 @@ def valid_table_definition_options # :nodoc:
# @see create_table with the hypertable options.
def create_hypertable(table_name,
time_column: 'created_at',
by_range: :created_at,
chunk_time_interval: '1 week',
compress_segmentby: nil,
compress_orderby: 'created_at',
compression_interval: nil,
partition_column: nil,
number_partitions: nil,
compress_after: nil,
drop_after: nil,
**hypertable_options)

original_logger = ActiveRecord::Base.logger
ActiveRecord::Base.logger = Logger.new(STDOUT)

options = ["chunk_time_interval => #{chunk_time_interval_clause(chunk_time_interval)}"]
options += hypertable_options.map { |k, v| "#{k} => #{quote(v)}" }

arguments = [
quote(table_name),
quote(time_column),
(quote(partition_column) if partition_column),
(number_partitions if partition_column),
*options
"by_range(#{quote(time_column)}, #{parse_interval(chunk_time_interval)})",
*hypertable_options.map { |k, v| "#{k} => #{quote(v)}" }
]

execute "SELECT create_hypertable(#{arguments.compact.join(', ')})"

if compress_segmentby
execute <<~SQL
ALTER TABLE #{table_name} SET (
timescaledb.compress,
timescaledb.compress_orderby = '#{compress_orderby}',
timescaledb.compress_segmentby = '#{compress_segmentby}'
)
SQL
if compress_segmentby || compress_after
add_compression_policy(table_name, orderby: compress_orderby, segmentby: compress_segmentby, compress_after: compress_after)
end
if compression_interval
execute "SELECT add_compression_policy('#{table_name}', INTERVAL '#{compression_interval}')"

if drop_after
add_retention_policy(table_name, drop_after: drop_after)
end
ensure
ActiveRecord::Base.logger = original_logger if original_logger
Expand Down Expand Up @@ -146,14 +136,40 @@ def remove_continuous_aggregate_policy(table_name)
execute "SELECT remove_continuous_aggregate_policy('#{table_name}')"
end

def create_retention_policy(table_name, interval:)
execute "SELECT add_retention_policy('#{table_name}', INTERVAL '#{interval}')"
def create_retention_policy(table_name, drop_after:)
execute "SELECT add_retention_policy('#{table_name}', drop_after => #{parse_interval(drop_after)})"
end

alias_method :add_retention_policy, :create_retention_policy

def remove_retention_policy(table_name)
execute "SELECT remove_retention_policy('#{table_name}')"
end


# Enable compression policy.
#
# @param table_name [String] The name of the table.
# @param orderby [String] The column to order by.
# @param segmentby [String] The column to segment by.
# @param compress_after [String] The interval to compress after.
# @param compression_chunk_time_interval [String] In case to merge chunks.
#
# @see https://docs.timescale.com/api/latest/compression/add_compression_policy/
def add_compression_policy(table_name, orderby:, segmentby:, compress_after: nil, compression_chunk_time_interval: nil)
options = []
options << 'timescaledb.compress'
options << "timescaledb.compress_orderby = '#{orderby}'" if orderby
options << "timescaledb.compress_segmentby = '#{segmentby}'" if segmentby
options << "timescaledb.compression_chunk_time_interval = INTERVAL '#{compression_chunk_time_interval}'" if compression_chunk_time_interval
execute <<~SQL
ALTER TABLE #{table_name} SET (
#{options.join(',')}
)
SQL
execute "SELECT add_compression_policy('#{table_name}', compress_after => INTERVAL '#{compress_after}')" if compress_after
end

private

# Build a string for the WITH clause of the CREATE MATERIALIZED VIEW statement.
Expand All @@ -166,11 +182,11 @@ def build_with_clause_option_string(option_key, options)
",timescaledb.#{option_key}=#{value}"
end

def chunk_time_interval_clause(chunk_time_interval)
if chunk_time_interval.is_a?(Numeric)
chunk_time_interval
def parse_interval(interval)
if interval.is_a?(Numeric)
interval
else
"INTERVAL '#{chunk_time_interval}'"
"INTERVAL '#{interval}'"
end
end
end
Expand Down
3 changes: 2 additions & 1 deletion spec/support/active_record/models.rb
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
ActiveRecord::Base.extend Timescaledb::ActsAsHypertable
ActiveSupport.on_load(:active_record) { extend Timescaledb::ActsAsHypertable }


class Event < ActiveRecord::Base
self.primary_key = "identifier"
Expand Down
55 changes: 20 additions & 35 deletions spec/support/active_record/schema.rb
Original file line number Diff line number Diff line change
@@ -1,43 +1,28 @@
def create_hypertable(table_name:, time_column_name: :created_at, options: {})
create_table(table_name.to_sym, id: false, hypertable: options) do |t|
t.string :identifier, null: false
t.jsonb :payload
t.timestamp time_column_name.to_sym
end
end

def setup_tables
ActiveRecord::Schema.define(version: 1) do
create_hypertable(
table_name: :events,
time_column_name: :created_at,
options: {
time_column: 'created_at',
chunk_time_interval: '1 min',
compress_segmentby: 'identifier',
compression_interval: '7 days'
}
)

create_hypertable(table_name: :hypertable_with_no_options)
hypertable_options = { chunk_time_interval: '1 min', compress_segmentby: 'identifier', compress_after: '7 days' }

create_hypertable(
table_name: :hypertable_with_options,
time_column_name: :timestamp,
options: {
time_column: 'timestamp',
chunk_time_interval: '1 min',
compress_segmentby: 'identifier',
compress_orderby: 'timestamp',
compression_interval: '7 days'
}
)
create_table(:events, id: false, hypertable: hypertable_options) do |t|
t.string :identifier, null: false
t.jsonb :payload
t.timestamptz :created_at
end

create_hypertable(
table_name: :hypertable_with_custom_time_column,
time_column_name: :timestamp,
options: { time_column: 'timestamp' }
)
create_table(:hypertable_with_options, id: false, hypertable: {
time_column: :ts,
chunk_time_interval: '5 min',
compress_segmentby: 'identifier',
compress_orderby: 'ts',
compress_after: '15 min',
drop_after: '1 hour',
if_not_exists: true
}) do |t|
t.serial :id, primary_key: false
t.timestamptz :ts
t.string :identifier
t.index [:id, :ts], name: "index_hypertable_with_options_on_id_and_ts"
end

create_table(:hypertable_with_id_partitioning, hypertable: {
time_column: 'id',
Expand Down
Loading

0 comments on commit ec1ac9b

Please sign in to comment.