Skip to content

Commit

Permalink
Make inheritance use the hypertable as a parent class
Browse files Browse the repository at this point in the history
  • Loading branch information
jonatas committed Sep 17, 2024
1 parent 93b4465 commit 817caa8
Show file tree
Hide file tree
Showing 2 changed files with 83 additions and 162 deletions.
220 changes: 72 additions & 148 deletions examples/toolkit-demo/candlestick.rb
Original file line number Diff line number Diff line change
Expand Up @@ -14,168 +14,97 @@

ActiveRecord::Base.establish_connection ARGV.first

class Tick < ActiveRecord::Base
acts_as_hypertable time_column: "time"
acts_as_time_vector segment_by: "symbol", value_column: "price"
end

require "active_support/concern"

module Candlestick
extend ActiveSupport::Concern

included do
acts_as_hypertable time_column: "time_bucket"

%w[open high low close].each do |name|
attribute name, :decimal
attribute "#{name}_time", :time
end
attribute :volume, :decimal
attribute :vwap, :decimal

scope :attributes, -> do
select("symbol, time_bucket,
toolkit_experimental.open(candlestick),
toolkit_experimental.high(candlestick),
toolkit_experimental.low(candlestick),
toolkit_experimental.close(candlestick),
toolkit_experimental.open_time(candlestick),
toolkit_experimental.high_time(candlestick),
toolkit_experimental.low_time(candlestick),
toolkit_experimental.close_time(candlestick),
toolkit_experimental.volume(candlestick),
toolkit_experimental.vwap(candlestick)")
end

scope :rollup, -> (timeframe: '1h') do
bucket = %|time_bucket('#{timeframe}', "time_bucket")|
select(bucket,"symbol",
"toolkit_experimental.rollup(candlestick) as candlestick")
.group(1,2)
.order(1)
end


scope :time_vector_from_candlestick, -> ( attribute: "close") do
select("timevector(time_bucket, toolkit_experimental.#{attribute}(candlestick))")
end

scope :plotly_attribute,
-> (attribute: "close",
type: "scatter",
from: nil,
template: %\'{"x": {{ TIMES | json_encode() | safe }}, "y": {{ VALUES | json_encode() | safe }}, "type": "#{type}"}'\) do
from ||= time_vector_from_candlestick(attribute: attribute)

select("toolkit_experimental.to_text(tv.timevector, #{template})::json")
.from("( #{from.to_sql} ) as tv")
.first["to_text"]
end

scope :plotly_candlestick, -> (from: nil) do
data = attributes

{
type: 'candlestick',
xaxis: 'x',
yaxis: 'y',
x: data.map(&:time_bucket),
open: data.map(&:open),
high: data.map(&:high),
low: data.map(&:low),
close: data.map(&:close),
vwap: data.map(&:vwap),
volume: data.map(&:volume)
}
end


def readonly?
true
end
end

class_methods do
end
end

class Candlestick1m < ActiveRecord::Base
self.table_name = 'candlestick_1m'
include Candlestick
end

class Candlestick1h < ActiveRecord::Base
self.table_name = 'candlestick_1h'
include Candlestick
end

class Candlestick1d < ActiveRecord::Base
self.table_name = 'candlestick_1d'
include Candlestick
end

ActiveRecord::Base.connection.add_toolkit_to_search_path!

def db(&block)
ActiveRecord::Base.logger = Logger.new(STDOUT)
ActiveRecord::Base.connection.instance_exec(&block)
ActiveRecord::Base.logger = nil
end

db do
ActiveRecord::Base.logger = Logger.new(STDOUT)
override = true

if !Tick.table_exists? || override
drop_table(:ticks, if_exists: true, force: :cascade)
if true
drop_table :ticks, if_exists: true, force: :cascade

hypertable_options = {
time_column: 'time',
chunk_time_interval: '1 day',
compress_segmentby: 'symbol',
compress_orderby: 'time',
compression_interval: '1 hour'
time_column: "time",
chunk_time_interval: "1 day",
compress_segmentby: "symbol",
compress_orderby: "time",
compression_interval: "1 week"
}
create_table :ticks, hypertable: hypertable_options, id: false do |t|
t.column :time, 'timestamp with time zone'
t.text :symbol
create_table :ticks, id: false, hypertable: hypertable_options, if_not_exists: true do |t|
t.timestamp :time, null: false
t.string :symbol, null: false
t.decimal :price
t.float :volume
t.integer :volume
end

add_index :ticks, [:time, :symbol]

options = -> (timeframe) do
{
with_data: false,
refresh_policies: {
start_offset: "INTERVAL '1 month'",
end_offset: "INTERVAL '#{timeframe}'",
schedule_interval: "INTERVAL '#{timeframe}'"
}
}
end
add_index :ticks, [:time, :symbol], if_not_exists: true
end
end

create_cagg = -> (timeframe: , query: ) do
view_name = "candlestick_#{timeframe}"
create_continuous_aggregate(view_name, query, **options[timeframe])
end

create_cagg[timeframe: '1m', query: Tick._candlestick(timeframe: '1m')]
create_cagg[timeframe: '1h', query: Candlestick1m.rollup(timeframe: '1h')]
create_cagg[timeframe: '1d', query: Candlestick1h.rollup(timeframe: '1d')]
class Tick < ActiveRecord::Base
extend Timescaledb::ActsAsHypertable
extend Timescaledb::ActsAsTimeVector
include Timescaledb:: ContinuousAggregatesHelper

acts_as_hypertable time_column: "time"
acts_as_time_vector segment_by: "symbol", value_column: "price"

scope :ohlcv, -> do
select("symbol,
first(price, time) as open,
max(price) as high,
min(price) as low,
last(price, time) as close,
sum(volume) as volume").group("symbol")
end
# scope :plotly_candlestick, -> (from: nil) do
# data = attributes
# {
# type: 'candlestick',
# xaxis: 'x',
# yaxis: 'y',
# x: data.map(&:time),
# open: data.map(&:open),
# high: data.map(&:high),
# low: data.map(&:low),
# close: data.map(&:close),
# volume: data.map(&:volume)
# }
# end

continuous_aggregates(
timeframes: [:minute, :hour, :day, :month],
scopes: [:ohlcv],
refresh_policy: {
minute: { start_offset: "10 minutes", end_offset: "1 minute", schedule_interval: "1 minute" },
hour: { start_offset: "4 hour", end_offset: "1 hour", schedule_interval: "1 hour" },
day: { start_offset: "3 day", end_offset: "1 day", schedule_interval: "1 day" },
month: { start_offset: "3 month", end_offset: "1 day", schedule_interval: "1 day" }
})

descendants.each{|e|e.time_vector_options[:value_column] = :close}
end

if Tick.count.zero?
db do
execute(ActiveRecord::Base.sanitize_sql_for_conditions( [<<~SQL, {from: 1.week.ago.to_date, to: 1.day.from_now.to_date}]))

db do
execute(ActiveRecord::Base.sanitize_sql_for_conditions( [<<~SQL, {from: 1.week.ago.to_date, to: 1.day.from_now.to_date}]))
INSERT INTO ticks
SELECT time, 'SYMBOL', 1 + (random()*30)::int, 100*(random()*10)::int
FROM generate_series(TIMESTAMP :from,
TIMESTAMP :to,
INTERVAL '10 second') AS time;
SQL
end

Tick.create_continuous_aggregates
Tick.refresh_aggregates
end


if ARGV.include?("--pry")
Pry.start
return
end

require 'sinatra/base'
Expand All @@ -190,7 +119,7 @@ class App < Sinatra::Base
get '/daily_close_price' do
json({
title: "Daily",
data: Candlestick1h.plotly_attribute(attribute: :close)
data: Tick::OhlcvPerMinute.last_week.plotly_candlestick
})
end
get '/candlestick_1m' do
Expand All @@ -203,7 +132,7 @@ class App < Sinatra::Base
get '/candlestick_1h' do
json({
title: "Candlestick yesterday hourly",
data: Candlestick1h.yesterday.plotly_candlestick
data:Tick::OhlcvPerHour.yesterday.plotly_candlestick
})

end
Expand All @@ -228,9 +157,4 @@ class App < Sinatra::Base
HTML
end
end

if ARGV.include?("--pry")
Pry.start
else
App.run!
end
App.run!
25 changes: 11 additions & 14 deletions lib/timescaledb/continuous_aggregates_helper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ def create_continuous_aggregates(with_data: false)
connection.execute <<~SQL
CREATE MATERIALIZED VIEW IF NOT EXISTS #{klass.table_name}
WITH (timescaledb.continuous) AS
#{klass.base_query.to_sql}
#{klass.base_query}
#{with_data ? 'WITH DATA' : 'WITH NO DATA'};
SQL

Expand Down Expand Up @@ -129,19 +129,16 @@ class << self

interval = "'1 #{timeframe.to_s}'"
self.base_model = base_model
self.base_query =
if previous_timeframe
prev_klass = base_model.const_get("#{aggregate_name}_per_#{previous_timeframe}".classify)
select_clause = base_model.apply_rollup_rules("#{config[:select]}")
prev_klass
.select("time_bucket(#{interval}, #{time_column}) as #{time_column}, #{select_clause}")
.group(1, *config[:group_by])
else
scope = base_model.public_send(config[:scope_name])
config[:select] = scope.select_values.join(', ')
config[:group_by] = scope.group_values
scope.rollup(interval)
end
if previous_timeframe
prev_klass = base_model.const_get("#{aggregate_name}_per_#{previous_timeframe}".classify)
select_clause = base_model.apply_rollup_rules("#{config[:select]}")
self.base_query = "SELECT time_bucket(#{interval}, #{time_column}) as #{time_column}, #{select_clause} FROM #{prev_klass.table_name} GROUP BY #{[1, *config[:group_by]].join(', ')}"
else
scope = base_model.public_send(config[:scope_name])
config[:select] = scope.select_values.join(', ')
config[:group_by] = scope.group_values
self.base_query = scope.rollup(interval).to_sql
end

def self.refresh!(start_time = nil, end_time = nil)
if start_time && end_time
Expand Down

0 comments on commit 817caa8

Please sign in to comment.