Skip to content

Commit

Permalink
Initial commit of Kajiya
Browse files Browse the repository at this point in the history
Kajiya is an RBE-compatible REAPI backend implementation used as a
testing server during development of Chromium's new build tooling.
It is not meant for production use, but can be very useful for local
testing of any remote execution related code.

Tested (see README.md for details):
- Bazel building itself
- autoninja + reclient building Chromium

I could not test it with Siso yet, because Kajiya doesn't support
authentication and Siso currently insists on using auth.

Change-Id: Ic3b21e85a6f012bfe83c4fd07643f4100dea7a94
Reviewed-on: https://chromium-review.googlesource.com/c/infra/infra/+/4668284
Reviewed-by: Fumitoshi Ukai <[email protected]>
Auto-Submit: Philipp Wollermann <[email protected]>
Commit-Queue: Philipp Wollermann <[email protected]>
Cr-Commit-Position: refs/heads/main@{#58044}
GitOrigin-RevId: d3672b2b2d7b50a305fb464fafb5449670368f35
  • Loading branch information
philwo committed Jul 9, 2023
0 parents commit 4a6bd26
Show file tree
Hide file tree
Showing 14 changed files with 2,138 additions and 0 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
# Binary generated by `go build`.
/kajiya
27 changes: 27 additions & 0 deletions LICENSE
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
// Copyright 2023 The Chromium Authors. All rights reserved.
//
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions are
// met:
//
// * Redistributions of source code must retain the above copyright
// notice, this list of conditions and the following disclaimer.
// * Redistributions in binary form must reproduce the above
// copyright notice, this list of conditions and the following disclaimer
// in the documentation and/or other materials provided with the
// distribution.
// * Neither the name of Google Inc. nor the names of its
// contributors may be used to endorse or promote products derived from
// this software without specific prior written permission.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
39 changes: 39 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
# 🔥 鍛冶屋 (Kajiya)

Kajiya is an RBE-compatible REAPI backend implementation used as a testing
server during development of Chromium's new build tooling. It is not meant
for production use, but can be very useful for local testing of any remote
execution related code.

## How to use

```shell
$ go build && ./kajiya

# Build Bazel using kajiya as the backend.
$ bazel build --remote_executor=grpc://localhost:50051 //src:bazel

# Build Chromium with autoninja + reclient using kajiya as the backend.
$ gn gen out/default --args="use_remoteexec=true"
$ env \
RBE_automatic_auth=false \
RBE_service="localhost:50051" \
RBE_service_no_security=true \
RBE_service_no_auth=true \
RBE_compression_threshold=-1 \
autoninja -C out/default -j $(nproc) chrome
```

## Features

Kajiya can act as an REAPI remote cache and/or remote executor. By default, both
services are provided, but you can also run an executor without a cache, or a
cache without an executor:

```shell
# Remote execution without caching
$ ./kajiya -cache=false

# Remote caching without execution (clients must upload action results)
$ ./kajiya -execution=false
```
104 changes: 104 additions & 0 deletions actioncache/local.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
// Copyright 2023 The Chromium Authors
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

package actioncache

import (
"fmt"
"os"
"path/filepath"

"github.com/bazelbuild/remote-apis-sdks/go/pkg/digest"
remote "github.com/bazelbuild/remote-apis/build/bazel/remote/execution/v2"
"google.golang.org/protobuf/proto"
)

// ActionCache is a simple action cache implementation that stores ActionResults on the local disk.
type ActionCache struct {
dataDir string
}

// New creates a new local ActionCache. The data directory is created if it does not exist.
func New(dataDir string) (*ActionCache, error) {
if dataDir == "" {
return nil, fmt.Errorf("data directory must be specified")
}

if err := os.MkdirAll(dataDir, 0755); err != nil {
return nil, err
}

// Create subdirectories {00, 01, ..., ff} for sharding by hash prefix.
for i := 0; i <= 255; i++ {
err := os.Mkdir(filepath.Join(dataDir, fmt.Sprintf("%02x", i)), 0755)
if err != nil {
if os.IsExist(err) {
continue
}
return nil, err
}
}

return &ActionCache{
dataDir: dataDir,
}, nil
}

// path returns the path to the file with digest d in the action cache.
func (c *ActionCache) path(d digest.Digest) string {
return filepath.Join(c.dataDir, d.Hash[:2], d.Hash)
}

// Get returns the cached ActionResult for the given digest.
func (c *ActionCache) Get(actionDigest digest.Digest) (*remote.ActionResult, error) {
p := c.path(actionDigest)

// Read the action result for the requested action into a byte slice.
buf, err := os.ReadFile(p)
if err != nil {
return nil, err
}

// Unmarshal it into an ActionResult message and return it to the client.
actionResult := &remote.ActionResult{}
if err := proto.Unmarshal(buf, actionResult); err != nil {
return nil, err
}

return actionResult, nil
}

// Put stores the given ActionResult for the given digest.
func (c *ActionCache) Put(actionDigest digest.Digest, ar *remote.ActionResult) error {
// Marshal the action result.
actionResultRaw, err := proto.Marshal(ar)
if err != nil {
return err
}

// Store the action result in our action cache.
f, err := os.CreateTemp(c.dataDir, "tmp_")
if err != nil {
return err
}
if _, err := f.Write(actionResultRaw); err != nil {
f.Close()
return err
}
if err := f.Close(); err != nil {
return err
}
if err := os.Rename(f.Name(), c.path(actionDigest)); err != nil {
// TODO: It's possible that on Windows we cannot rename the file to the destination because it already exists.
// In that case, we should check if the file is identical to the one we're trying to write, and if so, ignore the error.
return err
}

return nil
}

// Remove deletes the cached ActionResult for the given digest.
func (c *ActionCache) Remove(d digest.Digest) error {
return fmt.Errorf("not implemented yet")
}
164 changes: 164 additions & 0 deletions actioncache/service.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
// Copyright 2023 The Chromium Authors
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

// Package actioncache implements the REAPI ActionCache service.
package actioncache

import (
"context"
"fmt"
"log"
"os"

"github.com/bazelbuild/remote-apis-sdks/go/pkg/digest"
remote "github.com/bazelbuild/remote-apis/build/bazel/remote/execution/v2"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

"github.com/philwo/kajiya/blobstore"
)

// Service implements the REAPI ActionCache service.
type Service struct {
remote.UnimplementedActionCacheServer

// The ActionCache to use for storing ActionResults.
ac *ActionCache

// The blobstore.ContentAddressableStorage to use for reading blobs.
cas *blobstore.ContentAddressableStorage
}

// Register creates and registers a new Service with the given gRPC server.
func Register(s *grpc.Server, ac *ActionCache, cas *blobstore.ContentAddressableStorage) error {
service, err := NewService(ac, cas)
if err != nil {
return err
}
remote.RegisterActionCacheServer(s, service)
return nil
}

// NewService creates a new Service.
func NewService(ac *ActionCache, cas *blobstore.ContentAddressableStorage) (Service, error) {
if ac == nil {
return Service{}, fmt.Errorf("ac must be set")
}

if cas == nil {
return Service{}, fmt.Errorf("cas must be set")
}

return Service{
ac: ac,
cas: cas,
}, nil
}

// GetActionResult returns the ActionResult for a given action digest.
func (s Service) GetActionResult(ctx context.Context, request *remote.GetActionResultRequest) (*remote.ActionResult, error) {
response, err := s.getActionResult(request)
if err != nil {
if status.Code(err) == codes.NotFound {
log.Printf("⚠️ GetActionResult(%v) => Cache miss", request.ActionDigest)
} else {
log.Printf("🚨 GetActionResult(%v) => Error: %v", request.ActionDigest, err)
}
} else {
log.Printf("🎉 GetActionResult(%v) => Cache hit", request.ActionDigest)
}
return response, err
}

func (s Service) getActionResult(request *remote.GetActionResultRequest) (*remote.ActionResult, error) {
// If the client explicitly specifies a DigestFunction, ensure that it's SHA256.
if request.DigestFunction != remote.DigestFunction_UNKNOWN && request.DigestFunction != remote.DigestFunction_SHA256 {
return nil, status.Errorf(codes.InvalidArgument, "hash function %q is not supported", request.DigestFunction.String())
}

actionDigest, err := digest.NewFromProto(request.ActionDigest)
if err != nil {
return nil, status.Error(codes.InvalidArgument, err.Error())
}

actionResult, err := s.ac.Get(actionDigest)
if err != nil {
if os.IsNotExist(err) {
return nil, status.Errorf(codes.NotFound, "action digest %s not found in cache", actionDigest)
}
return nil, status.Error(codes.Internal, err.Error())
}

return actionResult, nil
}

// UpdateActionResult stores an ActionResult for a given action digest on disk.
func (s Service) UpdateActionResult(ctx context.Context, request *remote.UpdateActionResultRequest) (*remote.ActionResult, error) {
response, err := s.updateActionResult(request)
if err != nil {
log.Printf("🚨 UpdateActionResult(%v) => Error: %v", request.ActionDigest, err)
} else {
log.Printf("✅ UpdateActionResult(%v) => OK", request.ActionDigest)
}
return response, err
}

func (s Service) updateActionResult(request *remote.UpdateActionResultRequest) (*remote.ActionResult, error) {
// If the client explicitly specifies a DigestFunction, ensure that it's SHA256.
if request.DigestFunction != remote.DigestFunction_UNKNOWN && request.DigestFunction != remote.DigestFunction_SHA256 {
return nil, status.Errorf(codes.InvalidArgument, "hash function %q is not supported", request.DigestFunction.String())
}

// Check that the client didn't send inline stdout / stderr data.
if request.ActionResult.StdoutRaw != nil {
return nil, status.Error(codes.InvalidArgument, "client should not populate stdout_raw during upload")
}
if request.ActionResult.StderrRaw != nil {
return nil, status.Error(codes.InvalidArgument, "client should not populate stderr_raw during upload")
}

// Check that the action digest is valid.
actionDigest, err := digest.NewFromProto(request.ActionDigest)
if err != nil {
return nil, status.Error(codes.InvalidArgument, err.Error())
}

// Check that the action is present in our CAS.
if _, err := s.cas.Stat(actionDigest); err != nil {
return nil, status.Errorf(codes.NotFound, "action digest %s not found in CAS", actionDigest)
}

// If the action result contains a stdout digest, check that it is present in our CAS.
if request.ActionResult.StdoutDigest != nil {
stdoutDigest, err := digest.NewFromProto(request.ActionResult.StdoutDigest)
if err != nil {
return nil, status.Error(codes.InvalidArgument, err.Error())
}
if _, err := s.cas.Stat(stdoutDigest); err != nil {
return nil, status.Errorf(codes.NotFound, "stdout digest %s not found in CAS", stdoutDigest)
}
}

// Same for stderr.
if request.ActionResult.StderrDigest != nil {
stderrDigest, err := digest.NewFromProto(request.ActionResult.StderrDigest)
if err != nil {
return nil, status.Error(codes.InvalidArgument, err.Error())
}
if _, err := s.cas.Stat(stderrDigest); err != nil {
return nil, status.Errorf(codes.NotFound, "stderr digest %s not found in CAS", stderrDigest)
}
}

// TODO: Check that all the output files are present in our CAS.

// Store the action result.
if err := s.ac.Put(actionDigest, request.ActionResult); err != nil {
return nil, status.Error(codes.Internal, err.Error())
}

// Return the action result.
return request.ActionResult, nil
}
16 changes: 16 additions & 0 deletions blobstore/fastcopy.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
// Copyright 2023 The Chromium Authors
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

//go:build !darwin

package blobstore

import "os"

// fastCopy copies a file from source to destination using a hard link.
// This is usually the best we can do, unless the operating system supports
// copy-on-write semantics for files (e.g. macOS with APFS).
func fastCopy(source, destination string) error {
return os.Link(source, destination)
}
18 changes: 18 additions & 0 deletions blobstore/fastcopy_darwin.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
// Copyright 2023 The Chromium Authors
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

//go:build darwin

package blobstore

import (
"golang.org/x/sys/unix"
)

// fastCopy copies a file from source to destination using a clonefile syscall.
// This is nicer than using a hard link, because it means that even if the file
// is accidentally modified, the copy will still have the original contents.
func fastCopy(source, destination string) error {
return unix.Clonefile(source, destination, unix.CLONE_NOFOLLOW)
}
Loading

0 comments on commit 4a6bd26

Please sign in to comment.