Skip to content

Commit

Permalink
Seamless integratino between continuous_aggregates and acts_as_time_v…
Browse files Browse the repository at this point in the history
…ector
  • Loading branch information
jonatas committed Sep 19, 2024
1 parent 3000452 commit 63c6951
Show file tree
Hide file tree
Showing 7 changed files with 260 additions and 433 deletions.
543 changes: 160 additions & 383 deletions docs/toolkit_candlestick.md

Large diffs are not rendered by default.

70 changes: 47 additions & 23 deletions examples/toolkit-demo/candlestick.rb
Original file line number Diff line number Diff line change
Expand Up @@ -28,16 +28,9 @@ class Tick < ActiveRecord::Base
acts_as_hypertable time_column: "time"
acts_as_time_vector segment_by: "symbol", value_column: "price"

scope :ohlcv, -> do
select("symbol,
first(price, time) as open,
max(price) as high,
min(price) as low,
last(price, time) as close,
sum(volume) as volume").group("symbol")
end

scope :plotly_candlestick, -> (from: nil) do
data = all.to_a
data = ohlcv.to_a
{
type: 'candlestick',
xaxis: 'x',
Expand All @@ -50,18 +43,49 @@ class Tick < ActiveRecord::Base
volume: data.map(&:volume)
}
end
scope :ohlcv, -> do
select("symbol,
first(price, time) as open,
max(price) as high,
min(price) as low,
last(price, time) as close,
sum(volume) as volume")
end

continuous_aggregates(
timeframes: [:minute, :hour, :day, :month],
scopes: [:ohlcv],
refresh_policy: {
minute: { start_offset: "10 minutes", end_offset: "1 minute", schedule_interval: "1 minute" },
hour: { start_offset: "4 hour", end_offset: "1 hour", schedule_interval: "1 hour" },
day: { start_offset: "3 day", end_offset: "1 day", schedule_interval: "1 day" },
month: { start_offset: "3 month", end_offset: "1 day", schedule_interval: "1 day" }
})

descendants.each{|e|e.time_vector_options = time_vector_options.merge(value_column: :close)}
scopes: [:_candlestick]
)

descendants.each do |cagg|
cagg.class_eval do
self.time_vector_options = time_vector_options.merge(value_column: :close)
[:open, :high, :low, :close].each do |attr|
attribute attr, :decimal, precision: 10, scale: 2
end
[:volume, :vwap].each do |attr|
attribute attr, :integer
end
[:open_time, :high_time, :low_time, :close_time].each do |attr|
attribute attr, :time
end
scope :ohlcv, -> do
unscoped
.from("(#{to_sql}) AS candlestick")
.select(time_column, *segment_by_column,
"open(candlestick),
high(candlestick),
low(candlestick),
close(candlestick),
open_time(candlestick),
high_time(candlestick),
low_time(candlestick),
close_time(candlestick),
volume(candlestick),
vwap(candlestick)")
end
end
end
end


Expand All @@ -78,7 +102,7 @@ class Tick < ActiveRecord::Base
compression_interval: "1 week"
}
create_table :ticks, id: false, hypertable: hypertable_options, if_not_exists: true do |t|
t.timestamp :time, null: false
t.timestamptz :time, null: false
t.string :symbol, null: false
t.decimal :price
t.integer :volume
Expand Down Expand Up @@ -116,28 +140,28 @@ class App < Sinatra::Base
get '/daily_close_price' do
json({
title: "Daily",
data: Tick::OhlcvPerDay.previous_week.plotly_candlestick
data: Tick::CandlestickPerDay.previous_week.plotly_candlestick
})
end
get '/candlestick_1m' do
json({
title: "Candlestick 1 minute last hour",
data: Tick::OhlcvPerMinute.last_hour.plotly_candlestick
data: Tick::CandlestickPerMinute.last_hour.plotly_candlestick
})
end

get '/candlestick_1h' do
json({
title: "Candlestick yesterday hourly",
data:Tick::OhlcvPerHour.yesterday.plotly_candlestick
data:Tick::CandlestickPerHour.yesterday.plotly_candlestick
})

end

get '/candlestick_1d' do
json({
title: "Candlestick daily this month",
data: Tick::OhlcvPerDay.previous_week.plotly_candlestick
data: Tick::CandlestickPerDay.previous_week.plotly_candlestick
})
end

Expand Down
1 change: 0 additions & 1 deletion lib/timescaledb/acts_as_time_vector.rb
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,3 @@ def acts_as_time_vector?
end
end
end
ActiveRecord::Base.extend Timescaledb::ActsAsTimeVector
38 changes: 27 additions & 11 deletions lib/timescaledb/continuous_aggregates_helper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ module ContinuousAggregatesHelper
/high\((\w+),\s*(\w+)\)\s+as\s+(\w+)/ => 'max(\1) as \1',
/low\((\w+),\s*(\w+)\)\s+as\s+(\w+)/ => 'min(\1) as \1',
/last\((\w+),\s*(\w+)\)\s+as\s+(\w+)/ => 'last(\3, \2) as \3',
/candlestick_agg\((\w+)\)\s+as\s+(\w+)/ => 'rollup(\2) as \2',
/candlestick_agg\((\w+),\s*(\w+),\s*(\w+)\)\s+as\s+(\w+)/ => 'rollup(\4) as \4',
/stats_agg\((\w+),\s*(\w+)\)\s+as\s+(\w+)/ => 'rollup(\3) as \3',
/stats_agg\((\w+)\)\s+as\s+(\w+)/ => 'rollup(\2) as \2',
/state_agg\((\w+)\)\s+as\s+(\w+)/ => 'rollup(\2) as \2',
Expand All @@ -21,16 +21,27 @@ module ContinuousAggregatesHelper
}

scope :rollup, ->(interval) do
select_values = self.select_values.join(', ')
if select_values.include?('time_bucket(')
select_values = (self.select_values - ["time"]).select{|e|!e.downcase.start_with?("time_bucket")}
if self.select_values.any?{|e|e.downcase.start_with?('time_bucket(')} || self.select_values.include?('time')
select_values = apply_rollup_rules(select_values)
select_values.gsub!(/time_bucket\((.+), (.+)\)/, "time_bucket(#{interval}, \2)")
select_values.gsub!(/\btime\b/, "time_bucket(#{interval}, time) as time")
end
group_values = self.group_values.dup

if self.segment_by_column
if !group_values.include?(self.segment_by_column)
group_values << self.segment_by_column
end
if !select_values.include?(self.segment_by_column.to_s)
select_values.insert(0, self.segment_by_column.to_s)
end
end
group_values = self.group_values
where_values = self.where_values_hash
self.unscoped.select("time_bucket(#{interval}, #{time_column}) as #{time_column}, #{select_values}")
tb = "time_bucket(#{interval}, #{time_column})"
self.unscoped.select("#{tb} as #{time_column}, #{select_values.join(', ')}")
.where(where_values)
.group(1, *group_values)
.group(tb, *group_values)
end
end

Expand Down Expand Up @@ -94,9 +105,13 @@ def create_continuous_aggregates(with_data: false)
end

def apply_rollup_rules(select_values)
rollup_rules.reduce(select_values) do |result, (pattern, replacement)|
result.gsub(pattern, replacement)
result = select_values.dup
rollup_rules.each do |pattern, replacement|
result.gsub!(pattern, replacement)
end
# Remove any remaining time_bucket
result.gsub!(/time_bucket\(.+?\)( as \w+)?/, '')
result
end

def drop_continuous_aggregates
Expand Down Expand Up @@ -128,15 +143,16 @@ class << self

interval = "'1 #{timeframe.to_s}'"
self.base_model = base_model
tb = "time_bucket(#{interval}, #{time_column})"
if previous_timeframe
prev_klass = base_model.const_get("#{aggregate_name}_per_#{previous_timeframe}".classify)
select_clause = base_model.apply_rollup_rules("#{config[:select]}")
self.base_query = "SELECT time_bucket(#{interval}, #{time_column}) as #{time_column}, #{select_clause} FROM \"#{prev_klass.table_name}\" GROUP BY #{[1, *config[:group_by]].join(', ')}"
self.base_query = "SELECT #{tb} as #{time_column}, #{select_clause} FROM \"#{prev_klass.table_name}\" GROUP BY #{[tb, *config[:group_by]].join(', ')}"
else
scope = base_model.public_send(config[:scope_name])
config[:select] = scope.select_values.join(', ')
config[:select] = scope.select_values.select{|e|!e.downcase.start_with?("time_bucket")}.join(', ')
config[:group_by] = scope.group_values
self.base_query = "SELECT time_bucket(#{interval}, #{time_column}) as #{time_column}, #{config[:select]} FROM \"#{scope.table_name}\" GROUP BY #{[1, *config[:group_by]].join(', ')}"
self.base_query = "SELECT #{tb} as #{time_column}, #{config[:select]} FROM \"#{scope.table_name}\" GROUP BY #{[tb, *config[:group_by]].join(', ')}"
end

def self.refresh!(start_time = nil, end_time = nil)
Expand Down
5 changes: 5 additions & 0 deletions spec/support/active_record/models.rb
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,10 @@ class HypertableSkipAllScopes < ActiveRecord::Base
class HypertableWithContinuousAggregates < ActiveRecord::Base
extend Timescaledb::ActsAsHypertable
include Timescaledb::ContinuousAggregatesHelper
extend Timescaledb::ActsAsTimeVector

acts_as_hypertable time_column: 'ts'
acts_as_time_vector segment_by: :identifier

scope :total, -> { select("count(*) as total") }
scope :by_identifier, -> { select("identifier, count(*) as total").group(:identifier) }
Expand All @@ -51,6 +53,9 @@ class HypertableWithContinuousAggregates < ActiveRecord::Base
month: { start_offset: "3 month", end_offset: "1 hour", schedule_interval: "1 hour" }
}
)
descendants.each do |cagg|
cagg.time_vector_options = time_vector_options.merge(value_column: :total)
end
end

class NonHypertable < ActiveRecord::Base
Expand Down
30 changes: 15 additions & 15 deletions spec/timescaledb/continuous_aggregates_helper_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -80,20 +80,20 @@
test_class.create_continuous_aggregates
aggregate_classes = [test_class::TotalPerMinute, test_class::TotalPerHour, test_class::TotalPerDay, test_class::TotalPerMonth]

expect(test_class::TotalPerMinute.base_query).to eq("SELECT time_bucket('1 minute', ts) as ts, count(*) as total FROM \"hypertable_with_continuous_aggregates\" GROUP BY 1")
expect(test_class::TotalPerMonth.base_query).to eq("SELECT time_bucket('1 month', ts) as ts, sum(total) as total FROM \"total_per_day\" GROUP BY 1")
expect(test_class::TotalPerDay.base_query).to eq("SELECT time_bucket('1 day', ts) as ts, sum(total) as total FROM \"total_per_hour\" GROUP BY 1")
expect(test_class::TotalPerHour.base_query).to eq("SELECT time_bucket('1 hour', ts) as ts, sum(total) as total FROM \"total_per_minute\" GROUP BY 1")

expect(test_class::ByVersionPerMinute.base_query).to eq("SELECT time_bucket('1 minute', ts) as ts, identifier, version, count(*) as total FROM \"hypertable_with_continuous_aggregates\" GROUP BY 1, identifier, version")
expect(test_class::ByVersionPerMonth.base_query).to eq("SELECT time_bucket('1 month', ts) as ts, identifier, version, sum(total) as total FROM \"by_version_per_day\" GROUP BY 1, identifier, version")
expect(test_class::ByVersionPerDay.base_query).to eq("SELECT time_bucket('1 day', ts) as ts, identifier, version, sum(total) as total FROM \"by_version_per_hour\" GROUP BY 1, identifier, version")
expect(test_class::ByVersionPerHour.base_query).to eq("SELECT time_bucket('1 hour', ts) as ts, identifier, version, sum(total) as total FROM \"by_version_per_minute\" GROUP BY 1, identifier, version")

expect(test_class::ByIdentifierPerMinute.base_query).to eq("SELECT time_bucket('1 minute', ts) as ts, identifier, count(*) as total FROM \"hypertable_with_continuous_aggregates\" GROUP BY 1, identifier")
expect(test_class::ByIdentifierPerMonth.base_query).to eq("SELECT time_bucket('1 month', ts) as ts, identifier, sum(total) as total FROM \"by_identifier_per_day\" GROUP BY 1, identifier")
expect(test_class::ByIdentifierPerDay.base_query).to eq("SELECT time_bucket('1 day', ts) as ts, identifier, sum(total) as total FROM \"by_identifier_per_hour\" GROUP BY 1, identifier")
expect(test_class::ByIdentifierPerHour.base_query).to eq("SELECT time_bucket('1 hour', ts) as ts, identifier, sum(total) as total FROM \"by_identifier_per_minute\" GROUP BY 1, identifier")
expect(test_class::TotalPerMinute.base_query).to eq("SELECT time_bucket('1 minute', ts) as ts, count(*) as total FROM \"hypertable_with_continuous_aggregates\" GROUP BY time_bucket('1 minute', ts)")
expect(test_class::TotalPerMonth.base_query).to eq("SELECT time_bucket('1 month', ts) as ts, sum(total) as total FROM \"total_per_day\" GROUP BY time_bucket('1 month', ts)")
expect(test_class::TotalPerDay.base_query).to eq("SELECT time_bucket('1 day', ts) as ts, sum(total) as total FROM \"total_per_hour\" GROUP BY time_bucket('1 day', ts)")
expect(test_class::TotalPerHour.base_query).to eq("SELECT time_bucket('1 hour', ts) as ts, sum(total) as total FROM \"total_per_minute\" GROUP BY time_bucket('1 hour', ts)")

expect(test_class::ByVersionPerMinute.base_query).to eq("SELECT time_bucket('1 minute', ts) as ts, identifier, version, count(*) as total FROM \"hypertable_with_continuous_aggregates\" GROUP BY time_bucket('1 minute', ts), identifier, version")
expect(test_class::ByVersionPerMonth.base_query).to eq("SELECT time_bucket('1 month', ts) as ts, identifier, version, sum(total) as total FROM \"by_version_per_day\" GROUP BY time_bucket('1 month', ts), identifier, version")
expect(test_class::ByVersionPerDay.base_query).to eq("SELECT time_bucket('1 day', ts) as ts, identifier, version, sum(total) as total FROM \"by_version_per_hour\" GROUP BY time_bucket('1 day', ts), identifier, version")
expect(test_class::ByVersionPerHour.base_query).to eq("SELECT time_bucket('1 hour', ts) as ts, identifier, version, sum(total) as total FROM \"by_version_per_minute\" GROUP BY time_bucket('1 hour', ts), identifier, version")

expect(test_class::ByIdentifierPerMinute.base_query).to eq("SELECT time_bucket('1 minute', ts) as ts, identifier, count(*) as total FROM \"hypertable_with_continuous_aggregates\" GROUP BY time_bucket('1 minute', ts), identifier")
expect(test_class::ByIdentifierPerMonth.base_query).to eq("SELECT time_bucket('1 month', ts) as ts, identifier, sum(total) as total FROM \"by_identifier_per_day\" GROUP BY time_bucket('1 month', ts), identifier")
expect(test_class::ByIdentifierPerDay.base_query).to eq("SELECT time_bucket('1 day', ts) as ts, identifier, sum(total) as total FROM \"by_identifier_per_hour\" GROUP BY time_bucket('1 day', ts), identifier")
expect(test_class::ByIdentifierPerHour.base_query).to eq("SELECT time_bucket('1 hour', ts) as ts, identifier, sum(total) as total FROM \"by_identifier_per_minute\" GROUP BY time_bucket('1 hour', ts), identifier")
end
end

Expand All @@ -114,7 +114,7 @@
expect(ActiveRecord::Base.connection).to have_received(:execute).with(/CREATE MATERIALIZED VIEW IF NOT EXISTS by_version_per_minute/i)
expect(ActiveRecord::Base.connection).to have_received(:execute).with(/CREATE MATERIALIZED VIEW IF NOT EXISTS by_identifier_per_month/i)

expect(test_class::TotalPerMinute.select("sum(total) as total").rollup("'1 hour'").to_sql).to eq("SELECT time_bucket('1 hour', ts) as ts, sum(total) as total FROM \"total_per_minute\" GROUP BY 1")
expect(test_class::TotalPerMinute.select("sum(total) as total").rollup("'1 hour'").to_sql).to eq("SELECT time_bucket('1 hour', ts) as ts, identifier, sum(total) as total FROM \"total_per_minute\" GROUP BY time_bucket('1 hour', ts), \"identifier\"")
end

it 'sets up refresh policies for each aggregate' do
Expand Down
6 changes: 6 additions & 0 deletions spec/timescaledb/toolkit_helper_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@

let(:model) do
Measurement = Class.new(ActiveRecord::Base) do
extend Timescaledb::ActsAsHypertable
extend Timescaledb::ActsAsTimeVector
self.table_name = 'measurements'
self.primary_key = nil

Expand Down Expand Up @@ -207,6 +209,8 @@

let(:model) do
Measurement = Class.new(ActiveRecord::Base) do
extend Timescaledb::ActsAsHypertable
extend Timescaledb::ActsAsTimeVector
self.table_name = 'measurements'
self.primary_key = nil

Expand Down Expand Up @@ -297,6 +301,8 @@

let(:model) do
Tick = Class.new(ActiveRecord::Base) do
extend Timescaledb::ActsAsTimeVector
extend Timescaledb::ActsAsHypertable
self.table_name = 'ticks'
self.primary_key = nil

Expand Down

0 comments on commit 63c6951

Please sign in to comment.