Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add BulkImport APIs and cron #966

Open
wants to merge 22 commits into
base: feat/bulk-import-base
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 15 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .github/PULL_REQUEST_TEMPLATE.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ highlighting the necessary changes)
latest branch (`git branch --all`) whose `X.Y` is greater than the latest released tag.
- If no such branch exists, then create one from the latest released branch.
- [ ] If added a foreign key constraint on `app_id_to_user_id` table, make sure to delete from this table when deleting the user as well if `deleteUserIdMappingToo` is false.
- [ ] If added a new recipe, then make sure to update the bulk import API to include the new recipe.

## Remaining TODOs for this PR

- [ ] Item1
Expand Down
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,13 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres
to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [9.1.0] - 2024-04-25

### Added

- Adds APIs to bulk import users
- Adds `ProcessBulkImportUsers` cron job to process bulk import users

## [9.0.2] - 2024-04-17

- Fixes issue with core startup when creation of CUD/app/tenant has partial failure
Expand Down
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ compileTestJava { options.encoding = "UTF-8" }
// }
//}

version = "9.0.2"
version = "9.1.0"

anku255 marked this conversation as resolved.
Show resolved Hide resolved

repositories {
Expand Down
3 changes: 2 additions & 1 deletion coreDriverInterfaceSupported.json
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
"2.21",
"3.0",
"4.0",
"5.0"
"5.0",
"5.1"
]
}
6 changes: 6 additions & 0 deletions src/main/java/io/supertokens/Main.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import io.supertokens.config.Config;
import io.supertokens.config.CoreConfig;
import io.supertokens.cronjobs.Cronjobs;
import io.supertokens.cronjobs.bulkimport.ProcessBulkImportUsers;
import io.supertokens.cronjobs.deleteExpiredAccessTokenSigningKeys.DeleteExpiredAccessTokenSigningKeys;
import io.supertokens.cronjobs.deleteExpiredDashboardSessions.DeleteExpiredDashboardSessions;
import io.supertokens.cronjobs.deleteExpiredEmailVerificationTokens.DeleteExpiredEmailVerificationTokens;
Expand Down Expand Up @@ -60,6 +61,8 @@ public class Main {

// this is a special variable that will be set to true by TestingProcessManager
public static boolean isTesting = false;
// this flag is used in ProcessBulkImportUsersCronJobTest to skip the user validation
public static boolean isTesting_skipBulkImportUserValidationInCronJob = false;

// this is a special variable that will be set to true by TestingProcessManager
public static boolean makeConsolePrintSilent = false;
Expand Down Expand Up @@ -255,6 +258,9 @@ private void init() throws IOException, StorageQueryException {
// starts DeleteExpiredAccessTokenSigningKeys cronjob if the access token signing keys can change
Cronjobs.addCronjob(this, DeleteExpiredAccessTokenSigningKeys.init(this, uniqueUserPoolIdsTenants));

// starts ProcessBulkImportUsers cronjob to process bulk import users
Cronjobs.addCronjob(this, ProcessBulkImportUsers.init(this, uniqueUserPoolIdsTenants));

// this is to ensure tenantInfos are in sync for the new cron job as well
MultitenancyHelper.getInstance(this).refreshCronjobs();

Expand Down
8 changes: 4 additions & 4 deletions src/main/java/io/supertokens/authRecipe/AuthRecipe.java
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@
import io.supertokens.session.Session;
import io.supertokens.storageLayer.StorageLayer;
import io.supertokens.useridmapping.UserIdType;
import io.supertokens.utils.Utils;

import org.jetbrains.annotations.TestOnly;

import javax.annotation.Nullable;
Expand Down Expand Up @@ -327,8 +329,7 @@ public static LinkAccountsResult linkAccounts(Main main, AppIdentifier appIdenti
RecipeUserIdAlreadyLinkedWithAnotherPrimaryUserIdException, InputUserIdIsNotAPrimaryUserException,
UnknownUserIdException, TenantOrAppNotFoundException, FeatureNotEnabledException {

if (Arrays.stream(FeatureFlag.getInstance(main, appIdentifier).getEnabledFeatures())
.noneMatch(t -> t == EE_FEATURES.ACCOUNT_LINKING || t == EE_FEATURES.MFA)) {
if (!Utils.isAccountLinkingEnabled(main, appIdentifier)) {
throw new FeatureNotEnabledException(
"Account linking feature is not enabled for this app. Please contact support to enable it.");
}
Expand Down Expand Up @@ -532,8 +533,7 @@ public static CreatePrimaryUserResult createPrimaryUser(Main main,
RecipeUserIdAlreadyLinkedWithPrimaryUserIdException, UnknownUserIdException, TenantOrAppNotFoundException,
FeatureNotEnabledException {

if (Arrays.stream(FeatureFlag.getInstance(main, appIdentifier).getEnabledFeatures())
.noneMatch(t -> t == EE_FEATURES.ACCOUNT_LINKING || t == EE_FEATURES.MFA)) {
if (!Utils.isAccountLinkingEnabled(main, appIdentifier)) {
throw new FeatureNotEnabledException(
"Account linking feature is not enabled for this app. Please contact support to enable it.");
}
Expand Down
90 changes: 90 additions & 0 deletions src/main/java/io/supertokens/bulkimport/BulkImport.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
/*
* Copyright (c) 2024, VRAI Labs and/or its affiliates. All rights reserved.
*
* This software is licensed under the Apache License, Version 2.0 (the
* "License") as published by the Apache Software Foundation.
*
* You may not use this file except in compliance with the License. You may
* obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/

package io.supertokens.bulkimport;

import io.supertokens.pluginInterface.bulkimport.BulkImportStorage.BULK_IMPORT_USER_STATUS;
import io.supertokens.pluginInterface.bulkimport.sqlStorage.BulkImportSQLStorage;
import io.supertokens.pluginInterface.Storage;
import io.supertokens.pluginInterface.StorageUtils;
import io.supertokens.pluginInterface.bulkimport.BulkImportUser;
import io.supertokens.pluginInterface.exceptions.StorageQueryException;
import io.supertokens.pluginInterface.multitenancy.AppIdentifier;
import io.supertokens.pluginInterface.multitenancy.exceptions.TenantOrAppNotFoundException;
import io.supertokens.utils.Utils;


import java.util.List;

import javax.annotation.Nonnull;
import javax.annotation.Nullable;

public class BulkImport {

public static final int MAX_USERS_TO_ADD = 10000;
public static final int GET_USERS_PAGINATION_LIMIT = 500;
public static final int GET_USERS_DEFAULT_LIMIT = 100;
public static final int DELETE_USERS_LIMIT = 500;
public static final int PROCESS_USERS_BATCH_SIZE = 1000;
public static final int PROCESS_USERS_INTERVAL_SECONDS = 60;
rishabhpoddar marked this conversation as resolved.
Show resolved Hide resolved

public static void addUsers(AppIdentifier appIdentifier, Storage storage, List<BulkImportUser> users)
throws StorageQueryException, TenantOrAppNotFoundException {
while (true) {
try {
StorageUtils.getBulkImportStorage(storage).addBulkImportUsers(appIdentifier, users);
break;
} catch (io.supertokens.pluginInterface.bulkimport.exceptions.DuplicateUserIdException ignored) {
// We re-generate the user id for every user and retry
for (BulkImportUser user : users) {
user.id = Utils.getUUID();
}
}
}
}

public static BulkImportUserPaginationContainer getUsers(AppIdentifier appIdentifier, Storage storage,
@Nonnull Integer limit, @Nullable BULK_IMPORT_USER_STATUS status, @Nullable String paginationToken)
rishabhpoddar marked this conversation as resolved.
Show resolved Hide resolved
throws StorageQueryException, BulkImportUserPaginationToken.InvalidTokenException {
List<BulkImportUser> users;

BulkImportSQLStorage bulkImportStorage = StorageUtils.getBulkImportStorage(storage);

if (paginationToken == null) {
users = bulkImportStorage
.getBulkImportUsers(appIdentifier, limit + 1, status, null, null);
} else {
BulkImportUserPaginationToken tokenInfo = BulkImportUserPaginationToken.extractTokenInfo(paginationToken);
users = bulkImportStorage
.getBulkImportUsers(appIdentifier, limit + 1, status, tokenInfo.bulkImportUserId, tokenInfo.createdAt);
}

String nextPaginationToken = null;
int maxLoop = users.size();
if (users.size() == limit + 1) {
maxLoop = limit;
BulkImportUser user = users.get(limit);
nextPaginationToken = new BulkImportUserPaginationToken(user.id, user.createdAt).generateToken();
}

List<BulkImportUser> resultUsers = users.subList(0, maxLoop);
return new BulkImportUserPaginationContainer(resultUsers, nextPaginationToken);
}

public static List<String> deleteUsers(AppIdentifier appIdentifier, Storage storage, String[] userIds) throws StorageQueryException {
return StorageUtils.getBulkImportStorage(storage).deleteBulkImportUsers(appIdentifier, userIds);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Copyright (c) 2024, VRAI Labs and/or its affiliates. All rights reserved.
*
* This software is licensed under the Apache License, Version 2.0 (the
* "License") as published by the Apache Software Foundation.
*
* You may not use this file except in compliance with the License. You may
* obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/

package io.supertokens.bulkimport;

import java.util.List;

import javax.annotation.Nonnull;
import javax.annotation.Nullable;

import io.supertokens.pluginInterface.bulkimport.BulkImportUser;

public class BulkImportUserPaginationContainer {
public final List<BulkImportUser> users;
public final String nextPaginationToken;

public BulkImportUserPaginationContainer(@Nonnull List<BulkImportUser> users, @Nullable String nextPaginationToken) {
this.users = users;
this.nextPaginationToken = nextPaginationToken;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Copyright (c) 2024, VRAI Labs and/or its affiliates. All rights reserved.
*
* This software is licensed under the Apache License, Version 2.0 (the
* "License") as published by the Apache Software Foundation.
*
* You may not use this file except in compliance with the License. You may
* obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/

package io.supertokens.bulkimport;

import java.util.Base64;

public class BulkImportUserPaginationToken {
public final String bulkImportUserId;
public final long createdAt;

public BulkImportUserPaginationToken(String bulkImportUserId, long createdAt) {
this.bulkImportUserId = bulkImportUserId;
this.createdAt = createdAt;
}

public static BulkImportUserPaginationToken extractTokenInfo(String token) throws InvalidTokenException {
try {
String decodedPaginationToken = new String(Base64.getDecoder().decode(token));
String[] splitDecodedToken = decodedPaginationToken.split(";");
if (splitDecodedToken.length != 2) {
throw new InvalidTokenException();
}
String bulkImportUserId = splitDecodedToken[0];
long createdAt = Long.parseLong(splitDecodedToken[1]);
return new BulkImportUserPaginationToken(bulkImportUserId, createdAt);
} catch (Exception e) {
throw new InvalidTokenException();
}
}

public String generateToken() {
return new String(Base64.getEncoder().encode((this.bulkImportUserId + ";" + this.createdAt).getBytes()));
}

public static class InvalidTokenException extends Exception {

private static final long serialVersionUID = 6289026174830695478L;
}
}
Loading
Loading