Skip to content

Commit

Permalink
ucanto integration + fixing tests
Browse files Browse the repository at this point in the history
  • Loading branch information
fforbeck committed Oct 30, 2024
1 parent 902d69c commit abcd719
Show file tree
Hide file tree
Showing 13 changed files with 256 additions and 216 deletions.
8 changes: 3 additions & 5 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 3 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@
"author": "Alan Shaw",
"license": "Apache-2.0 OR MIT",
"dependencies": {
"@ucanto/client": "^9.0.1",
"@ucanto/principal": "^8.1.0",
"@ucanto/transport": "^9.1.1",
"@web3-storage/blob-fetcher": "^2.2.0",
"@web3-storage/gateway-lib": "^5.1.2",
"dagula": "^8.0.0",
Expand All @@ -48,7 +51,6 @@
"@types/mocha": "^10.0.9",
"@types/node-fetch": "^2.6.11",
"@types/sinon": "^17.0.3",
"@ucanto/principal": "^8.1.0",
"@web3-storage/content-claims": "^5.0.0",
"@web3-storage/public-bucket": "^1.1.0",
"@web3-storage/upload-client": "^16.1.1",
Expand Down
37 changes: 0 additions & 37 deletions src/middleware/withAccountingService.js

This file was deleted.

14 changes: 6 additions & 8 deletions src/middleware/withEgressTracker.js
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,8 @@ export function withEgressTracker (handler) {
createByteCountStream((totalBytesServed) => {
// Non-blocking call to the accounting service to record egress
if (totalBytesServed > 0) {
const { space, dataCid: resource } = ctx
ctx.waitUntil(
ctx.accountingService.record(space, resource, totalBytesServed, new Date().toISOString())
ctx.ucantoClient.record(ctx.space, ctx.dataCid, totalBytesServed, new Date())
)
}
})
Expand All @@ -44,15 +43,14 @@ export function withEgressTracker (handler) {
}

/**
* Creates a TransformStream to count bytes served to the client.
* It records egress when the stream is finalized without an error.
* Creates a TransformStream to count bytes in the response body.
*
* @param {(totalBytesServed: number) => void} onClose
* @param {(totalBytes: number) => void} onClose
* @template {Uint8Array} T
* @returns {TransformStream<T, T>} - The created TransformStream.
*/
function createByteCountStream (onClose) {
let totalBytesServed = 0
let totalBytes = 0

return new TransformStream({
/**
Expand All @@ -64,7 +62,7 @@ function createByteCountStream (onClose) {
async transform (chunk, controller) {
try {
controller.enqueue(chunk)
totalBytesServed += chunk.byteLength
totalBytes += chunk.byteLength
} catch (error) {
console.error('Error while counting bytes:', error)
controller.error(error)
Expand All @@ -79,7 +77,7 @@ function createByteCountStream (onClose) {
* NOTE: The flush function is NOT called in case of a stream error.
*/
async flush () {
onClose(totalBytesServed)
onClose(totalBytes)
}
})
}
6 changes: 3 additions & 3 deletions src/middleware/withEgressTracker.types.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
import { IpfsUrlContext, Environment as MiddlewareEnvironment } from '@web3-storage/gateway-lib'
import { AccountingService } from './withAccountingService.types.js'
import { DIDKey, UnknownLink } from '@ucanto/client'
import { UCantoClient } from './withUcantoClient.types.js'
import { DIDKey } from '@ucanto/principal/ed25519'

export interface Environment extends MiddlewareEnvironment {
FF_EGRESS_TRACKER_ENABLED: string
}

export interface Context extends IpfsUrlContext {
space: DIDKey
accountingService: AccountingService
ucantoClient: UCantoClient
}
18 changes: 4 additions & 14 deletions src/middleware/withRateLimit.js
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ async function isRateLimited (rateLimitAPI, cid) {
* @param {Environment} env
* @param {string} authToken
* @param {RateLimiterContext} ctx
* @returns {Promise<import('./withAccountingService.types.js').TokenMetadata | null>}
* @returns {Promise<import('./withUcantoClient.types.ts').TokenMetadata | null>}
*/
async function getTokenMetadata (env, authToken, ctx) {
const cachedValue = await env.AUTH_TOKEN_METADATA.get(authToken)
Expand All @@ -109,7 +109,7 @@ async function getTokenMetadata (env, authToken, ctx) {
return decode(cachedValue)
}

const tokenMetadata = findTokenMetadata(authToken)
const tokenMetadata = await ctx.ucantoClient.getTokenMetadata(authToken)
if (tokenMetadata) {
// NOTE: non-blocking call to the auth token metadata cache
ctx.waitUntil(env.AUTH_TOKEN_METADATA.put(authToken, encode(tokenMetadata)))
Expand All @@ -119,27 +119,17 @@ async function getTokenMetadata (env, authToken, ctx) {
return null
}

/**
* @param {string} authToken
* @returns {import('./withAccountingService.types.js').TokenMetadata | null}
*/
function findTokenMetadata (authToken) {
// TODO I think this needs to check the content claims service (?) for any claims relevant to this token
// TODO do we have a plan for this? need to ask Hannah if the indexing service covers this?
return null
}

/**
* @param {string} s
* @returns {import('./withAccountingService.types.js').TokenMetadata}
* @returns {import('./withUcantoClient.types.ts').TokenMetadata}
*/
function decode (s) {
// TODO should this be dag-json?
return JSON.parse(s)
}

/**
* @param {import('./withAccountingService.types.js').TokenMetadata} m
* @param {import('./withUcantoClient.types.ts').TokenMetadata} m
* @returns {string}
*/
function encode (m) {
Expand Down
2 changes: 2 additions & 0 deletions src/middleware/withRateLimit.types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import { CID } from '@web3-storage/gateway-lib/handlers'
import { IpfsUrlContext, Environment as MiddlewareEnvironment } from '@web3-storage/gateway-lib'
import { KVNamespace, RateLimit } from '@cloudflare/workers-types'
import { RATE_LIMIT_EXCEEDED } from '../constants.js'
import { UCantoClient } from './withUcantoClient.types.js'

export interface Environment extends MiddlewareEnvironment {
RATE_LIMITER: RateLimit
Expand All @@ -11,6 +12,7 @@ export interface Environment extends MiddlewareEnvironment {

export interface Context extends IpfsUrlContext {
authToken: string | null
ucantoClient: UCantoClient
}

export type RateLimitExceeded = typeof RATE_LIMIT_EXCEEDED[keyof typeof RATE_LIMIT_EXCEEDED]
Expand Down
21 changes: 21 additions & 0 deletions src/middleware/withUcantoClient.capabilities.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
import { SpaceDID, equalWith } from '@web3-storage/capabilities/utils'
import { capability, Schema } from '@ucanto/validator'

export const Usage = {
/**
* Capability can be invoked by an agent to record egress data for a given resource.
*/
record: capability({
can: 'usage/record',
with: SpaceDID,
nb: Schema.struct({
/** CID of the resource that was served. */
resource: Schema.link(),
/** Amount of bytes served. */
bytes: Schema.integer().greaterThan(0),
/** Timestamp of the event in seconds after Unix epoch. */
servedAt: Schema.integer().greaterThan(-1)
}),
derives: equalWith
})
}
92 changes: 92 additions & 0 deletions src/middleware/withUcantoClient.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
import * as UCantoClient from '@ucanto/client'
import * as CAR from '@ucanto/transport/car'
import { SpaceDID } from '@web3-storage/capabilities/utils'
import { ed25519 } from '@ucanto/principal'
import { HTTP } from '@ucanto/transport'
import { Usage } from './withUcantoClient.capabilities.js'

/**
* @import { Middleware } from '@web3-storage/gateway-lib'
* @typedef {import('./withUcantoClient.types.ts').UcantoClientContext} UcantoClientContext
* @typedef {import('./withUcantoClient.types.ts').Environment} Environment
*/

/**
* The UCantoClient handler exposes the methods to invoke capabilities on the Upload API.
*
* @type {Middleware<UcantoClientContext, UcantoClientContext, Environment>}
*/
export function withUcantoClient (handler) {
return async (req, env, ctx) => {
const ucantoClient = await create(env)

return handler(req, env, { ...ctx, ucantoClient })
}
}

/**
* Creates a UCantoClient instance with the given environment.
*
* @param {Environment} env
* @returns {Promise<import('./withUcantoClient.types.ts').UCantoClient>}
*/
async function create (env) {
const service = ed25519.Verifier.parse(env.SERVICE_ID)
const principal = ed25519.Signer.parse(env.SIGNER_PRINCIPAL_KEY)

const { connection } = await connectUcantoClient(env.UPLOAD_API_URL, principal)

return {
/**
* @param {import('@ucanto/principal/ed25519').DIDKey} space - The Space DID where the content was served
* @param {import('@ucanto/principal/ed25519').UnknownLink} resource - The link to the resource that was served
* @param {number} bytes - The number of bytes served
* @param {Date} servedAt - The timestamp of when the content was served
*/
record: async (space, resource, bytes, servedAt) => {
const res = await Usage.record.invoke({
issuer: principal,
audience: service,
with: SpaceDID.from(space),
nb: {
resource,
bytes,
servedAt: Math.floor(servedAt.getTime() / 1000)
}
}).execute(connection)

if (res.out.error) {
console.error('Failed to record egress', res.out.error)
}
},

/**
* TODO: implement this function
*
* @param {string} authToken
* @returns {Promise<import('./withUcantoClient.types.ts').TokenMetadata | undefined>}
*/
getTokenMetadata: async (authToken) => {
// TODO I think this needs to check the content claims service (?) for any claims relevant to this token
// TODO do we have a plan for this? need to ask Hannah if the indexing service covers this?
return undefined
}
}
}

/**
* Creates a connection with the UCanto Server at the provided server URL.
*
* @param {string} serverUrl
* @param {import('@ucanto/principal/ed25519').EdSigner} principal
*
*/
async function connectUcantoClient (serverUrl, principal) {
const connection = await UCantoClient.connect({
id: principal,
codec: CAR.outbound,
channel: HTTP.open({ url: new URL(serverUrl) })
})

return { connection }
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,23 @@ import { Environment as MiddlewareEnvironment, Context as MiddlewareContext } fr
import { DIDKey, UnknownLink } from '@ucanto/principal/ed25519'

export interface Environment extends MiddlewareEnvironment {
//TODO: ucanto signer principal key
SERVICE_ID: string
SIGNER_PRINCIPAL_KEY: string
UPLOAD_API_URL: string
}

export interface AccountingServiceContext extends MiddlewareContext {
accountingService?: AccountingService
export interface UcantoClientContext extends MiddlewareContext {
ucantoClient?: UCantoClient
}

export interface TokenMetadata {
locationClaim?: unknown // TODO: figure out the right type to use for this - we probably need it for the private data case to verify auth
invalid?: boolean
}

export interface AccountingService {
record: (space: DIDKey, resource: UnknownLink, bytes: number, servedAt: string) => Promise<void>
export interface UCantoClient {
record: (space: DIDKey, resource: UnknownLink, bytes: number, servedAt: Date) => Promise<void>
getTokenMetadata: (token: string) => Promise<TokenMetadata | undefined>
}


4 changes: 3 additions & 1 deletion test/helpers/builder.js
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@ export class Builder {
* @returns {Promise<{ root: import('multiformats').UnknownLink, shards: import('multiformats').Link[]}>}
*/
async add (input, options = {}) {
console.log('Adding ' + (Array.isArray(input) ? `${input.length} file${input.length === 1 ? '' : 's'}` : '1 blob') + '...')
if (process.env.DEBUG) {
console.log('Adding ' + (Array.isArray(input) ? `${input.length} file${input.length === 1 ? '' : 's'}` : '1 blob') + '...')
}
const unixFsEncoder = Array.isArray(input)
? UnixFS.createDirectoryEncoderStream(input)
: UnixFS.createFileEncoderStream(input)
Expand Down
Loading

0 comments on commit abcd719

Please sign in to comment.