Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Re-ingest local files on updates #5656

Merged
merged 18 commits into from
Oct 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 56 additions & 1 deletion runtime/compilers/rillv1/parse_model.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
package rillv1

import (
"context"
"errors"
"fmt"
"slices"
"strings"
"time"

runtimev1 "github.com/rilldata/rill/proto/gen/rill/runtime/v1"
"github.com/rilldata/rill/runtime/pkg/duckdbsql"
"github.com/rilldata/rill/runtime/pkg/fileutil"
"google.golang.org/protobuf/types/known/structpb"
)

Expand All @@ -34,7 +37,7 @@ type ModelYAML struct {
}

// parseModel parses a model definition and adds the resulting resource to p.Resources.
func (p *Parser) parseModel(node *Node) error {
func (p *Parser) parseModel(ctx context.Context, node *Node) error {
// Parse YAML
tmp := &ModelYAML{}
err := p.decodeNodeYAML(node, false, tmp)
Expand Down Expand Up @@ -106,6 +109,14 @@ func (p *Parser) parseModel(node *Node) error {
inputProps["sql"] = sql
}

// special handling to mark model as updated when local file changes
if inputConnector == "local_file" {
err = p.trackResourceNamesForDataPaths(ctx, ResourceName{Name: node.Name, Kind: ResourceKindModel}.Normalized(), inputProps)
if err != nil {
return err
}
}

inputPropsPB, err := structpb.NewStruct(inputProps)
if err != nil {
return fmt.Errorf(`found invalid input property type: %w`, err)
Expand Down Expand Up @@ -229,6 +240,50 @@ func (p *Parser) inferSQLRefs(node *Node) ([]ResourceName, error) {
return refs, nil
}

func (p *Parser) trackResourceNamesForDataPaths(ctx context.Context, name ResourceName, inputProps map[string]any) error {
c, ok := inputProps["invalidate_on_change"].(bool)
if ok && !c {
return nil
}
path, ok := inputProps["path"].(string)
if !ok {
return nil
}

var localPaths []string
if fileutil.IsGlob(path) {
entries, err := p.Repo.ListRecursive(ctx, path, true)
if err != nil || len(entries) == 0 {
// The actual error will be returned by the model reconciler
return nil
}

for _, entry := range entries {
localPaths = append(localPaths, entry.Path)
}
} else {
localPaths = []string{normalizePath(path)}
}

// Update parser's resourceNamesForDataPaths map to track which resources depend on the local file
for _, path := range localPaths {
resources := p.resourceNamesForDataPaths[path]
if !slices.Contains(resources, name) {
resources = append(resources, name)
p.resourceNamesForDataPaths[path] = resources
}
}

// Calculate hash of local files
hash, err := p.Repo.FileHash(ctx, localPaths)
if err != nil {
return err
}
// Add hash to input properties so that the model spec is considered updated when the local file changes
inputProps["local_files_hash"] = hash
return nil
}

// findLineNumber returns the line number of the pos in the given text.
// Lines are counted starting from 1, and positions start from 0.
func findLineNumber(text string, pos int) int {
Expand Down
7 changes: 4 additions & 3 deletions runtime/compilers/rillv1/parse_node.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package rillv1

import (
"context"
"errors"
"fmt"
"path/filepath"
Expand Down Expand Up @@ -31,12 +32,12 @@ type Node struct {
}

// parseNode multiplexes to the appropriate parse function based on the node kind.
func (p *Parser) parseNode(node *Node) error {
func (p *Parser) parseNode(ctx context.Context, node *Node) error {
switch node.Kind {
case ResourceKindSource:
return p.parseSource(node)
return p.parseSource(ctx, node)
case ResourceKindModel:
return p.parseModel(node)
return p.parseModel(ctx, node)
case ResourceKindMetricsView:
return p.parseMetricsView(node)
case ResourceKindExplore:
Expand Down
5 changes: 3 additions & 2 deletions runtime/compilers/rillv1/parse_source.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package rillv1

import (
"context"
"fmt"
"strconv"
"strings"
Expand All @@ -24,7 +25,7 @@ type SourceYAML struct {
}

// parseSource parses a source definition and adds the resulting resource to p.Resources.
func (p *Parser) parseSource(node *Node) error {
func (p *Parser) parseSource(ctx context.Context, node *Node) error {
// Parse YAML
tmp := &SourceYAML{}
err := p.decodeNodeYAML(node, false, tmp)
Expand All @@ -43,7 +44,7 @@ func (p *Parser) parseSource(node *Node) error {

// If the source has SQL and hasn't specified a connector, we treat it as a model
if node.SQL != "" && node.ConnectorInferred {
return p.parseModel(node)
return p.parseModel(ctx, node)
}

// Add SQL as a property
Expand Down
41 changes: 37 additions & 4 deletions runtime/compilers/rillv1/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,8 +185,9 @@ type Parser struct {
Errors []*runtimev1.ParseError

// Internal state
resourcesForPath map[string][]*Resource // Reverse index of Resource.Paths
resourcesForUnspecifiedRef map[string][]*Resource // Reverse index of Resource.rawRefs where kind=ResourceKindUnspecified
resourcesForPath map[string][]*Resource // Reverse index of Resource.Paths
resourcesForUnspecifiedRef map[string][]*Resource // Reverse index of Resource.rawRefs where kind=ResourceKindUnspecified
resourceNamesForDataPaths map[string][]ResourceName // Index of local data files to resources that depend on them
k-anshul marked this conversation as resolved.
Show resolved Hide resolved
insertedResources []*Resource
updatedResources []*Resource
deletedResources []*Resource
Expand Down Expand Up @@ -275,7 +276,14 @@ func (p *Parser) Reparse(ctx context.Context, paths []string) (*Diff, error) {
// IsSkippable returns true if the path will be skipped by Reparse.
// It's useful for callers to avoid triggering a reparse when they know the path is not relevant.
func (p *Parser) IsSkippable(path string) bool {
return pathIsIgnored(path) || !pathIsYAML(path) && !pathIsSQL(path) && !pathIsDotEnv(path)
if pathIsIgnored(path) {
return true
}
_, ok := p.resourceNamesForDataPaths[path]
if ok {
return false
}
return !pathIsYAML(path) && !pathIsSQL(path) && !pathIsDotEnv(path)
}

// TrackedPathsInDir returns the paths under the given directory that the parser currently has cached results for.
Expand All @@ -301,6 +309,7 @@ func (p *Parser) reload(ctx context.Context) error {
p.DotEnv = nil
p.Resources = make(map[ResourceName]*Resource)
p.Errors = nil
p.resourceNamesForDataPaths = make(map[string][]ResourceName)
p.resourcesForPath = make(map[string][]*Resource)
p.resourcesForUnspecifiedRef = make(map[string][]*Resource)
p.insertedResources = nil
Expand Down Expand Up @@ -380,6 +389,17 @@ func (p *Parser) reparseExceptRillYAML(ctx context.Context, paths []string) (*Di
continue
}

// add resources corresponding to local data files
resources, ok := p.resourceNamesForDataPaths[path]
if ok {
for _, resource := range resources {
if res, ok := p.Resources[resource]; ok {
checkPaths = append(checkPaths, res.Paths...)
}
}
continue
}

isSQL := pathIsSQL(path)
isYAML := pathIsYAML(path)
isDotEnv := pathIsDotEnv(path)
Expand Down Expand Up @@ -673,7 +693,7 @@ func (p *Parser) parseStemPaths(ctx context.Context, paths []string) error {
// Parse the SQL/YAML file pair to a Node, then parse the Node to p.Resources.
node, err := p.parseStem(paths, yamlPath, yaml, sqlPath, sql)
if err == nil {
err = p.parseNode(node)
err = p.parseNode(ctx, node)
}

// Spread error across the node's paths (YAML and/or SQL files)
Expand Down Expand Up @@ -909,6 +929,19 @@ func (p *Parser) deleteResource(r *Resource) {
}
}

// Remove from p.resourceNamesForDataPaths
for path, resources := range p.resourceNamesForDataPaths {
idx := slices.Index(resources, r.Name.Normalized())
if idx < 0 {
continue
}
if len(resources) == 1 {
delete(p.resourceNamesForDataPaths, path)
} else {
p.resourceNamesForDataPaths[path] = slices.Delete(resources, idx, idx+1)
}
}

// Track in deleted resources (unless it was in insertedResources, in which case it's not a real deletion)
if !foundInInserted {
p.deletedResources = append(p.deletedResources, r)
Expand Down
29 changes: 29 additions & 0 deletions runtime/drivers/admin/repo.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package admin

import (
"context"
"crypto/md5"
"encoding/hex"
"fmt"
"io"
"io/fs"
Expand Down Expand Up @@ -136,6 +138,33 @@ func (h *Handle) Stat(ctx context.Context, filePath string) (*drivers.RepoObject
}, nil
}

func (h *Handle) FileHash(ctx context.Context, paths []string) (string, error) {
err := h.rlockEnsureCloned(ctx)
if err != nil {
return "", err
}
defer h.repoMu.RUnlock()

hasher := md5.New()
for _, path := range paths {
path = filepath.Join(h.projPath, path)
file, err := os.Open(path)
if err != nil {
if os.IsNotExist(err) {
continue
}
return "", err
}

if _, err := io.Copy(hasher, file); err != nil {
file.Close()
return "", err
}
file.Close()
}
return hex.EncodeToString(hasher.Sum(nil)), nil
}

func (h *Handle) Put(ctx context.Context, filePath string, reader io.Reader) error {
return fmt.Errorf("put operation is unsupported")
}
Expand Down
3 changes: 3 additions & 0 deletions runtime/drivers/duckdb/duckdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -446,6 +446,9 @@ func (c *connection) AsModelExecutor(instanceID string, opts *drivers.ModelExecu
if w, ok := opts.InputHandle.AsWarehouse(); ok {
return &warehouseToSelfExecutor{c, w}, true
}
if f, ok := opts.InputHandle.AsFileStore(); ok && opts.InputConnector == "local_file" {
return &localFileToSelfExecutor{c, f}, true
}
}
if opts.InputHandle == c {
if opts.OutputHandle.Driver() == "file" {
Expand Down
Loading
Loading