Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add BulkImport APIs and cron #966

Open
wants to merge 22 commits into
base: feat/bulk-import-base
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .github/PULL_REQUEST_TEMPLATE.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ highlighting the necessary changes)
latest branch (`git branch --all`) whose `X.Y` is greater than the latest released tag.
- If no such branch exists, then create one from the latest released branch.
- [ ] If added a foreign key constraint on `app_id_to_user_id` table, make sure to delete from this table when deleting the user as well if `deleteUserIdMappingToo` is false.
- [ ] If added a new recipe, then make sure to update the bulk import API to include the new recipe.

## Remaining TODOs for this PR

- [ ] Item1
Expand Down
4 changes: 4 additions & 0 deletions src/main/java/io/supertokens/Main.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import io.supertokens.config.Config;
import io.supertokens.config.CoreConfig;
import io.supertokens.cronjobs.Cronjobs;
import io.supertokens.cronjobs.bulkimport.ProcessBulkImportUsers;
import io.supertokens.cronjobs.deleteExpiredAccessTokenSigningKeys.DeleteExpiredAccessTokenSigningKeys;
import io.supertokens.cronjobs.deleteExpiredDashboardSessions.DeleteExpiredDashboardSessions;
import io.supertokens.cronjobs.deleteExpiredEmailVerificationTokens.DeleteExpiredEmailVerificationTokens;
Expand Down Expand Up @@ -254,6 +255,9 @@ private void init() throws IOException, StorageQueryException {
// starts DeleteExpiredAccessTokenSigningKeys cronjob if the access token signing keys can change
Cronjobs.addCronjob(this, DeleteExpiredAccessTokenSigningKeys.init(this, uniqueUserPoolIdsTenants));

// starts ProcessBulkImportUsers cronjob to process bulk import users
Cronjobs.addCronjob(this, ProcessBulkImportUsers.init(this, uniqueUserPoolIdsTenants));

// this is to ensure tenantInfos are in sync for the new cron job as well
MultitenancyHelper.getInstance(this).refreshCronjobs();

Expand Down
90 changes: 90 additions & 0 deletions src/main/java/io/supertokens/bulkimport/BulkImport.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
/*
* Copyright (c) 2024, VRAI Labs and/or its affiliates. All rights reserved.
*
* This software is licensed under the Apache License, Version 2.0 (the
* "License") as published by the Apache Software Foundation.
*
* You may not use this file except in compliance with the License. You may
* obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/

package io.supertokens.bulkimport;

import io.supertokens.pluginInterface.bulkimport.BulkImportStorage.BULK_IMPORT_USER_STATUS;
import io.supertokens.pluginInterface.bulkimport.sqlStorage.BulkImportSQLStorage;
import io.supertokens.pluginInterface.Storage;
import io.supertokens.pluginInterface.StorageUtils;
import io.supertokens.pluginInterface.bulkimport.BulkImportUser;
import io.supertokens.pluginInterface.exceptions.StorageQueryException;
import io.supertokens.pluginInterface.multitenancy.AppIdentifier;
import io.supertokens.pluginInterface.multitenancy.exceptions.TenantOrAppNotFoundException;
import io.supertokens.utils.Utils;


import java.util.List;

import javax.annotation.Nonnull;
import javax.annotation.Nullable;

public class BulkImport {

public static final int MAX_USERS_TO_ADD = 10000;
public static final int GET_USERS_PAGINATION_LIMIT = 500;
public static final int GET_USERS_DEFAULT_LIMIT = 100;
public static final int DELETE_USERS_LIMIT = 500;
public static final int PROCESS_USERS_BATCH_SIZE = 1000;
public static final int PROCESS_USERS_INTERVAL = 60;
sattvikc marked this conversation as resolved.
Show resolved Hide resolved

public static void addUsers(AppIdentifier appIdentifier, Storage storage, List<BulkImportUser> users)
throws StorageQueryException, TenantOrAppNotFoundException {
while (true) {
try {
StorageUtils.getBulkImportStorage(storage).addBulkImportUsers(appIdentifier, users);
break;
} catch (io.supertokens.pluginInterface.bulkimport.exceptions.DuplicateUserIdException ignored) {
// We re-generate the user id for every user and retry
for (BulkImportUser user : users) {
user.id = Utils.getUUID();
}
}
}
}

public static BulkImportUserPaginationContainer getUsers(AppIdentifier appIdentifier, Storage storage,
@Nonnull Integer limit, @Nullable BULK_IMPORT_USER_STATUS status, @Nullable String paginationToken)
rishabhpoddar marked this conversation as resolved.
Show resolved Hide resolved
throws StorageQueryException, BulkImportUserPaginationToken.InvalidTokenException {
List<BulkImportUser> users;

BulkImportSQLStorage bulkImportStorage = StorageUtils.getBulkImportStorage(storage);

if (paginationToken == null) {
users = bulkImportStorage
.getBulkImportUsers(appIdentifier, limit + 1, status, null, null);
} else {
BulkImportUserPaginationToken tokenInfo = BulkImportUserPaginationToken.extractTokenInfo(paginationToken);
users = bulkImportStorage
.getBulkImportUsers(appIdentifier, limit + 1, status, tokenInfo.bulkImportUserId, tokenInfo.createdAt);
}

String nextPaginationToken = null;
int maxLoop = users.size();
if (users.size() == limit + 1) {
maxLoop = limit;
BulkImportUser user = users.get(limit);
nextPaginationToken = new BulkImportUserPaginationToken(user.id, user.createdAt).generateToken();
}

List<BulkImportUser> resultUsers = users.subList(0, maxLoop);
return new BulkImportUserPaginationContainer(resultUsers, nextPaginationToken);
}

public static List<String> deleteUsers(AppIdentifier appIdentifier, Storage storage, String[] userIds) throws StorageQueryException {
return StorageUtils.getBulkImportStorage(storage).deleteBulkImportUsers(appIdentifier, userIds);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Copyright (c) 2024, VRAI Labs and/or its affiliates. All rights reserved.
*
* This software is licensed under the Apache License, Version 2.0 (the
* "License") as published by the Apache Software Foundation.
*
* You may not use this file except in compliance with the License. You may
* obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/

package io.supertokens.bulkimport;

import java.util.List;

import javax.annotation.Nonnull;
import javax.annotation.Nullable;

import io.supertokens.pluginInterface.bulkimport.BulkImportUser;

public class BulkImportUserPaginationContainer {
public final List<BulkImportUser> users;
public final String nextPaginationToken;

public BulkImportUserPaginationContainer(@Nonnull List<BulkImportUser> users, @Nullable String nextPaginationToken) {
this.users = users;
this.nextPaginationToken = nextPaginationToken;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Copyright (c) 2024, VRAI Labs and/or its affiliates. All rights reserved.
*
* This software is licensed under the Apache License, Version 2.0 (the
* "License") as published by the Apache Software Foundation.
*
* You may not use this file except in compliance with the License. You may
* obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/

package io.supertokens.bulkimport;

import java.util.Base64;

public class BulkImportUserPaginationToken {
public final String bulkImportUserId;
public final long createdAt;

public BulkImportUserPaginationToken(String bulkImportUserId, long createdAt) {
this.bulkImportUserId = bulkImportUserId;
this.createdAt = createdAt;
}

public static BulkImportUserPaginationToken extractTokenInfo(String token) throws InvalidTokenException {
try {
String decodedPaginationToken = new String(Base64.getDecoder().decode(token));
String[] splitDecodedToken = decodedPaginationToken.split(";");
if (splitDecodedToken.length != 2) {
throw new InvalidTokenException();
}
String bulkImportUserId = splitDecodedToken[0];
long createdAt = Long.parseLong(splitDecodedToken[1]);
return new BulkImportUserPaginationToken(bulkImportUserId, createdAt);
} catch (Exception e) {
throw new InvalidTokenException();
}
}

public String generateToken() {
return new String(Base64.getEncoder().encode((this.bulkImportUserId + ";" + this.createdAt).getBytes()));
}

public static class InvalidTokenException extends Exception {

private static final long serialVersionUID = 6289026174830695478L;
}
}
Loading
Loading