Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added per-user quota admin api #576

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

All notable changes to this project will be documented in this file.

The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to
[Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [Unreleased]
Expand All @@ -11,6 +11,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),

* New datastore option to ignore Redis cache when downloading media served by a `publicBaseUrl`. This can help ensure more requests get redirected to the CDN.
* `HEAD /download` is now supported, as per [MSC4120](https://github.com/matrix-org/matrix-spec-proposals/pull/4120).
* Added a user quota API where server administrators can programmatically get/set quotas for individual users.

### Fixed

Expand Down Expand Up @@ -109,13 +110,13 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),

* IPFS support has been removed due to maintenance burden.
* Exports initiated through the admin API no longer support `?include_data=false`. Exports will always contain data.
* Server-side blurhash calculation has been removed. Clients and bridges already calculate blurhashes locally where applicable.
* Server-side blurhash calculation has been removed. Clients and bridges already calculate blurhashes locally where applicable.

### Changed

* **Mandatory configuration change**: You must add datastore IDs to your datastore configuration, as matrix-media-repo will no longer manage datastores for you.
* If compiling `matrix-media-repo`, note that new external dependencies are required. See [the docs](https://docs.t2bot.io/matrix-media-repo/v1.3.3/installing/method/compilation.html).
* Docker images already contain these dependencies.
* Docker images already contain these dependencies.
* Datastores no longer use the `enabled` flag set on them. Use `forKinds: []` instead to disable a datastore's usage.
* Per-user upload quotas now do not allow users to exceed the maximum values, even by 1 byte. Previously, users could exceed the limits by a little bit.
* Updated to Go 1.19, then Go 1.20 in the same release cycle.
Expand Down Expand Up @@ -355,7 +356,7 @@ a large database (more than about 100k uploaded files), run the following steps
user is `media`, then run:
```sql
ALTER TABLE user_stats OWNER TO media;
ALTER FUNCTION track_update_user_media() OWNER TO media;
ALTER FUNCTION track_update_user_media() OWNER TO media;
```

### Added
Expand Down
74 changes: 74 additions & 0 deletions api/custom/users.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
package custom

import (
"encoding/json"
"net/http"

"github.com/getsentry/sentry-go"
"github.com/t2bot/matrix-media-repo/api/_apimeta"
"github.com/t2bot/matrix-media-repo/api/_responses"
"github.com/t2bot/matrix-media-repo/database"

"github.com/t2bot/matrix-media-repo/common/rcontext"
)

type UserQuotaEntry struct {
MaxBytes int64 `json:"max_bytes"`
MaxPending int64 `json:"max_pending"`
MaxFiles int64 `json:"max_files"`
}

func GetUserQuota(r *http.Request, rctx rcontext.RequestContext, user _apimeta.UserInfo) interface{} {
userIds := r.URL.Query()["user_id"]

db := database.GetInstance().UserStats.Prepare(rctx)

records, err := db.GetUserQuota(userIds)
if err != nil {
rctx.Log.Error(err)
sentry.CaptureException(err)
return _responses.InternalServerError("Failed to get quota for users")
}

parsed := make(map[string]*UserQuotaEntry)

for _, quota := range records {
entry := &UserQuotaEntry{
MaxBytes: quota.UserQuota.MaxBytes,
MaxPending: quota.UserQuota.MaxPending,
MaxFiles: quota.UserQuota.MaxFiles,
}
parsed[quota.UserId] = entry
}

return &_responses.DoNotCacheResponse{Payload: parsed}
}

func SetUserQuota(r *http.Request, rctx rcontext.RequestContext, user _apimeta.UserInfo) interface{} {
decoder := json.NewDecoder(r.Body)
params := make(map[string]*UserQuotaEntry)
err := decoder.Decode(&params)
if err != nil {
rctx.Log.Error(err)
sentry.CaptureException(err)
return _responses.InternalServerError("Failed to read SetUserQuota parameters")
}

db := database.GetInstance().UserStats.Prepare(rctx)

for userId, quota := range params {
if quota.MaxBytes < -1 || quota.MaxFiles < -1 || quota.MaxPending < -1 {
rctx.Log.Warn("SetUserQuota parameters for user " + userId + " must be >= -1. Skipping...")
continue
}

err = db.SetUserQuota(userId, quota.MaxBytes, quota.MaxFiles, quota.MaxPending)
if err != nil {
rctx.Log.Error(err)
sentry.CaptureException(err)
return _responses.InternalServerError("Failed to set quota for user " + userId)
}
}

return &_responses.DoNotCacheResponse{Payload: &_responses.EmptyResponse{}}
}
2 changes: 2 additions & 0 deletions api/routes.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,8 @@ func buildRoutes() http.Handler {
{":taskId", makeRoute(_routers.RequireRepoAdmin(custom.GetTask), "get_background_task", counter)},
})
register([]string{"GET"}, PrefixMedia, "admin/tasks/*branch", mxUnstable, router, tasksBranch)
register([]string{"GET"}, PrefixMedia, "admin/users/quota", mxUnstable, router, makeRoute(_routers.RequireRepoAdmin(custom.GetUserQuota), "get_user_quota", counter))
register([]string{"PUT"}, PrefixMedia, "admin/users/quota", mxUnstable, router, makeRoute(_routers.RequireRepoAdmin(custom.SetUserQuota), "set_user_quota", counter))
register([]string{"POST"}, PrefixMedia, "admin/user/:userId/export", mxUnstable, router, makeRoute(_routers.RequireAccessToken(custom.ExportUserData), "export_user_data", counter))
register([]string{"POST"}, PrefixMedia, "admin/server/:serverName/export", mxUnstable, router, makeRoute(_routers.RequireAccessToken(custom.ExportServerData), "export_server_data", counter))
register([]string{"GET"}, PrefixMedia, "admin/export/:exportId/view", mxUnstable, router, makeRoute(_routers.OptionalAccessToken(custom.ViewExport), "view_export", counter))
Expand Down
3 changes: 2 additions & 1 deletion config.sample.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,8 @@ uploads:
# The upload quota rules which affect users. The first rule to match the user ID will take
# effect. If a user does not match a rule, the defaults implied by the above config will
# take effect instead. The user will not be permitted to upload anything above these quota
# values, but can match them exactly.
# values, but can match them exactly. Note that quotas can also be set per-user via the
# admin API and will take precedence over any matches listed in the config file.
users:
- glob: "@*:*" # Affect all users. Use asterisks (*) to match any character.
# The maximum number of TOTAL bytes a user can upload. Defaults to zero (no limit).
Expand Down
62 changes: 62 additions & 0 deletions database/table_user_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,35 @@ import (
"database/sql"
"errors"

"github.com/lib/pq"
"github.com/t2bot/matrix-media-repo/common/rcontext"
)

type UserQuota struct {
MaxBytes int64
MaxPending int64
MaxFiles int64
}

type DbUserStats struct {
UserId string
UploadedBytes int64
UserQuota *UserQuota
// UserQuotaMaxBytes int64
// UserQuotaMaxPending int64
// UserQuotaMaxFiles int64
}

const selectUserStatsUploadedBytes = "SELECT uploaded_bytes FROM user_stats WHERE user_id = $1;"
const selectUserQuota = "SELECT user_id, uploaded_bytes, quota_max_bytes, quota_max_pending, quota_max_files FROM user_stats WHERE user_id = ANY($1);"
const updateUserQuota = "UPDATE user_stats SET quota_max_bytes = $2, quota_max_pending = $3, quota_max_files = $4 WHERE user_id = $1;"
const insertUserQuota = "INSERT INTO user_stats (user_id, uploaded_bytes, quota_max_bytes, quota_max_pending, quota_max_files) VALUES ($1, $2, $3, $4, $5);"

type userStatsTableStatements struct {
selectUserStatsUploadedBytes *sql.Stmt
selectUserQuota *sql.Stmt
updateUserQuota *sql.Stmt
insertUserQuota *sql.Stmt
}

type userStatsTableWithContext struct {
Expand All @@ -30,6 +47,15 @@ func prepareUserStatsTables(db *sql.DB) (*userStatsTableStatements, error) {
if stmts.selectUserStatsUploadedBytes, err = db.Prepare(selectUserStatsUploadedBytes); err != nil {
return nil, errors.New("error preparing selectUserStatsUploadedBytes: " + err.Error())
}
if stmts.selectUserQuota, err = db.Prepare(selectUserQuota); err != nil {
return nil, errors.New("error preparing selectUserQuota: " + err.Error())
}
if stmts.updateUserQuota, err = db.Prepare(updateUserQuota); err != nil {
return nil, errors.New("error preparing updateUserQuota: " + err.Error())
}
if stmts.insertUserQuota, err = db.Prepare(insertUserQuota); err != nil {
return nil, errors.New("error preparing insertUserQuota: " + err.Error())
}

return stmts, nil
}
Expand All @@ -51,3 +77,39 @@ func (s *userStatsTableWithContext) UserUploadedBytes(userId string) (int64, err
}
return val, err
}

func (s *userStatsTableWithContext) GetUserQuota(userIds []string) ([]*DbUserStats, error) {
rows, err := s.statements.selectUserQuota.QueryContext(s.ctx, pq.Array(userIds))

results := make([]*DbUserStats, 0)
if err != nil {
if errors.Is(err, sql.ErrNoRows) {
return results, nil
}
return nil, err
}
for rows.Next() {
val := &DbUserStats{UserQuota: &UserQuota{}}
if err = rows.Scan(&val.UserId, &val.UploadedBytes, &val.UserQuota.MaxBytes, &val.UserQuota.MaxPending, &val.UserQuota.MaxFiles); err != nil {
return nil, err
}
results = append(results, val)
}

return results, nil
}

func (s *userStatsTableWithContext) SetUserQuota(userId string, maxBytes int64, maxPending int64, maxFiles int64) error {
// Need to insert default record if user has not uploaded any media beforehand
row := s.statements.selectUserQuota.QueryRowContext(s.ctx, pq.Array([]string{userId}))
val := &DbUserStats{UserQuota: &UserQuota{}}
err := row.Scan(&val.UserId, &val.UploadedBytes, &val.UserQuota.MaxBytes, &val.UserQuota.MaxPending, &val.UserQuota.MaxFiles)

if errors.Is(err, sql.ErrNoRows) {
_, err = s.statements.insertUserQuota.ExecContext(s.ctx, userId, 0, maxBytes, maxFiles, maxPending)
} else {
_, err = s.statements.updateUserQuota.ExecContext(s.ctx, userId, maxBytes, maxFiles, maxPending)
}

return err
}
52 changes: 48 additions & 4 deletions docs/admin.md
Original file line number Diff line number Diff line change
Expand Up @@ -112,13 +112,13 @@ Note that this will only quarantine what is currently known to the repo. It will

## Datastore management

Datastores are used by the media repository to put files. Typically these match what is configured in the config file, such as s3 and directories.
Datastores are used by the media repository to put files. Typically these match what is configured in the config file, such as s3 and directories.

#### Listing available datastores

URL: `GET /_matrix/media/unstable/admin/datastores?access_token=your_access_token`

The result will be something like:
The result will be something like:
```json
{
"00be9363007feb66de554a79e16b7b49": {
Expand Down Expand Up @@ -301,6 +301,50 @@ Use the same endpoint as above, but specifying one or more `?mxc=mxc://example.o

Only repository administrators can use these endpoints.

## User quotas

In addition to specifying quotas in the config file, you may also set per-user quota entries via the admin API. Any value set via the API will take precedence over any matches to the user specified in the config file. To unset any user's quota values, you must set the entry to '-1'. To set a user's quota values using the default limits, set the entries to '0'.

Note that quotas must be enabled in the config in order for any quota values set via the admin API to take effect.

Only repository administrators can use these endpoints.

#### Get user quotas

URL: `GET /_matrix/media/unstable/admin/users/quota?access_token=your_access_token`

This endpoint queries the database for per-user quota entries set via the admin API. It will NOT retrieve the user's quota values if there is a glob entry in the config file that matches the user id. To query the quota values that currently take effect for the given user, you must use the media limits `/config` endpoint as specified in https://github.com/matrix-org/matrix-spec-proposals/pull/4034

You may specify one or more `?user_id=@alice:example.org` query parameters. Note that encoding the values may be required (not shown here). Users that are unknown to the media repo will not be returned.

The response for querying a user's quota:
```json
{
"@alice:example.org": {
"max_bytes": 53687063712,
"max_pending": -1,
"max_files": 100
}
}
```

#### Set user quotas

URL: `PUT /_matrix/media/unstable/admin/users/quota?access_token=your_access_token`

You may specify one or more user ids in the json body. Note that encoding the values may be required (not shown here). Also, you may set the quota value for a user even if the user entry does not already exist.

The example json body for setting a user's quota:
```json
{
"@alice:example.org": {
"max_bytes": 53687063712,
"max_pending": -1,
"max_files": 100
}
}
```

## Background Tasks API

The media repo keeps track of tasks that were started and did not block the request. For example, transferring media or quarantining large amounts of media may result in a background task. A `task_id` will be returned by those endpoints which can then be used here to get the status of a task.
Expand Down Expand Up @@ -494,9 +538,9 @@ URL: `POST /_matrix/media/unstable/admin/import`

The request body is the bytes of the first archive (eg: `TravisR-part-1.tgz` in the above examples).

The response body will be something like the following:
The response body will be something like the following:
```json
{
{
"import_id": "abcdef",
"task_id": 13
}
Expand Down
3 changes: 3 additions & 0 deletions migrations/29_add_user_stats_quotas_down.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
ALTER TABLE user_stats DROP COLUMN quota_max_bytes;
ALTER TABLE user_stats DROP COLUMN quota_max_pending;
ALTER TABLE user_stats DROP COLUMN quota_max_files;
3 changes: 3 additions & 0 deletions migrations/29_add_user_stats_quotas_up.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
ALTER TABLE user_stats ADD COLUMN quota_max_bytes BIGINT NOT NULL DEFAULT '-1';
ALTER TABLE user_stats ADD COLUMN quota_max_pending BIGINT NOT NULL DEFAULT '-1';
ALTER TABLE user_stats ADD COLUMN quota_max_files BIGINT NOT NULL DEFAULT '-1';
33 changes: 33 additions & 0 deletions pipelines/_steps/quota/check.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,39 @@ func Limit(ctx rcontext.RequestContext, userId string, quotaType Type) (int64, e
return defaultLimit(ctx, quotaType)
}

db := database.GetInstance().UserStats.Prepare(ctx)
record, err := db.GetUserQuota([]string{userId})
if err != nil {
ctx.Log.Warn("Error querying DB quota for user " + userId + ": " + err.Error())
} else if len(record) == 0 {
ctx.Log.Warn("User " + userId + " does not exist in DB. Skipping DB quota check...")
} else {
// DB quotas takes precedence over config quotas if value is not -1
quota := record[0].UserQuota
switch quotaType {
case MaxBytes:
if quota.MaxBytes > 0 {
return quota.MaxBytes, nil
} else if quota.MaxBytes == 0 {
return defaultLimit(ctx, quotaType)
}
case MaxPending:
if quota.MaxPending > 0 {
return quota.MaxPending, nil
} else if quota.MaxPending == 0 {
return defaultLimit(ctx, quotaType)
}
case MaxCount:
if quota.MaxFiles > 0 {
return quota.MaxFiles, nil
} else if quota.MaxFiles == 0 {
return defaultLimit(ctx, quotaType)
}
default:
return 0, errors.New("missing db switch for quota type - contact developer")
}
}

for _, q := range ctx.Config.Uploads.Quota.UserQuotas {
if glob.Glob(q.Glob, userId) {
if quotaType == MaxBytes {
Expand Down