Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add BulkImport APIs and cron #966

Open
wants to merge 22 commits into
base: feat/bulk-import-base
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,14 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres
to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [9.1.0] - 2024-04-10

### Added

- Adds APIs to bulk import users
- Adds `ProcessBulkImportUsers` cron job to process bulk import users


## [9.0.0] - 2024-03-13

### Added
Expand Down
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ compileTestJava { options.encoding = "UTF-8" }
// }
//}

version = "9.0.0"
version = "9.1.0"

anku255 marked this conversation as resolved.
Show resolved Hide resolved

repositories {
Expand Down
3 changes: 2 additions & 1 deletion coreDriverInterfaceSupported.json
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
"2.21",
"3.0",
"4.0",
"5.0"
"5.0",
"5.1"
]
}
2 changes: 2 additions & 0 deletions src/main/java/io/supertokens/Main.java
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ public class Main {

// this is a special variable that will be set to true by TestingProcessManager
public static boolean isTesting = false;
// this flag is used in ProcessBulkImportUsersCronJobTest to skip the user validation
public static boolean isTesting_skipBulkImportUserValidationInCronJob = false;

// this is a special variable that will be set to true by TestingProcessManager
public static boolean makeConsolePrintSilent = false;
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/io/supertokens/bulkimport/BulkImport.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public class BulkImport {
public static final int GET_USERS_DEFAULT_LIMIT = 100;
public static final int DELETE_USERS_LIMIT = 500;
public static final int PROCESS_USERS_BATCH_SIZE = 1000;
public static final int PROCESS_USERS_INTERVAL = 60;
public static final int PROCESS_USERS_INTERVAL_SECONDS = 60;

public static void addUsers(AppIdentifier appIdentifier, Storage storage, List<BulkImportUser> users)
throws StorageQueryException, TenantOrAppNotFoundException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -563,7 +563,8 @@ private void validateTenantIdsForRoleAndLoginMethods(Main main, AppIdentifier ap
if (commonTenantUserPoolId == null) {
commonTenantUserPoolId = tenantUserPoolId;
} else if (!commonTenantUserPoolId.equals(tenantUserPoolId)) {
errors.add("All tenants for a user must share the same storage.");
errors.add("All tenants for a user must share the same storage for " + loginMethod.recipeId
+ " recipe.");
rishabhpoddar marked this conversation as resolved.
Show resolved Hide resolved
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@
import io.supertokens.authRecipe.exception.RecipeUserIdAlreadyLinkedWithAnotherPrimaryUserIdException;
import io.supertokens.authRecipe.exception.RecipeUserIdAlreadyLinkedWithPrimaryUserIdException;
import io.supertokens.bulkimport.BulkImport;
import io.supertokens.bulkimport.BulkImportUserUtils;
import io.supertokens.bulkimport.exceptions.InvalidBulkImportDataException;
import io.supertokens.config.Config;
import io.supertokens.cronjobs.CronTask;
import io.supertokens.cronjobs.CronTaskTest;
Expand Down Expand Up @@ -113,8 +115,11 @@ protected void doTaskPerApp(AppIdentifier app)
List<BulkImportUser> users = bulkImportSQLStorage.getBulkImportUsersAndChangeStatusToProcessing(app,
BulkImport.PROCESS_USERS_BATCH_SIZE);

String[] allUserRoles = StorageUtils.getUserRolesStorage(bulkImportSQLStorage).getRoles(app);
BulkImportUserUtils bulkImportUserUtils = new BulkImportUserUtils(allUserRoles);

for (BulkImportUser user : users) {
processUser(app, user, bulkImportSQLStorage);
processUser(app, user, bulkImportUserUtils, bulkImportSQLStorage);
}
}

Expand All @@ -126,7 +131,7 @@ public int getIntervalTimeSeconds() {
return interval;
}
}
return BulkImport.PROCESS_USERS_INTERVAL;
return BulkImport.PROCESS_USERS_INTERVAL_SECONDS;
}

@Override
Expand All @@ -141,7 +146,7 @@ public int getInitialWaitTimeSeconds() {
}

private Storage getProxyStorage(TenantIdentifier tenantIdentifier)
rishabhpoddar marked this conversation as resolved.
Show resolved Hide resolved
throws InvalidConfigException, IOException, TenantOrAppNotFoundException, DbInitException {
throws InvalidConfigException, IOException, TenantOrAppNotFoundException, DbInitException, StorageQueryException {
String userPoolId = StorageLayer.getStorage(tenantIdentifier, main).getUserPoolId();
if (userPoolToStorageMap.containsKey(userPoolId)) {
return userPoolToStorageMap.get(userPoolId);
Expand All @@ -160,14 +165,15 @@ private Storage getProxyStorage(TenantIdentifier tenantIdentifier)

userPoolToStorageMap.put(userPoolId, bulkImportProxyStorage);
bulkImportProxyStorage.initStorage(true);
bulkImportProxyStorage.commitTransactionForBulkImportProxyStorage();
return bulkImportProxyStorage;
}
}
throw new TenantOrAppNotFoundException(tenantIdentifier);
}

public Storage[] getAllProxyStoragesForApp(Main main, AppIdentifier appIdentifier)
throws TenantOrAppNotFoundException, InvalidConfigException, IOException, DbInitException {
throws TenantOrAppNotFoundException, InvalidConfigException, IOException, DbInitException, StorageQueryException {
List<Storage> allProxyStorages = new ArrayList<>();

Map<ResourceDistributor.KeyClass, ResourceDistributor.SingletonResource> resources = main
Expand All @@ -189,21 +195,30 @@ private void closeAllProxyStorages() throws StorageQueryException {
userPoolToStorageMap.clear();
anku255 marked this conversation as resolved.
Show resolved Hide resolved
}

private void processUser(AppIdentifier appIdentifier, BulkImportUser user, BulkImportSQLStorage baseTenantStorage)
private void processUser(AppIdentifier appIdentifier, BulkImportUser user, BulkImportUserUtils bulkImportUserUtils,
BulkImportSQLStorage baseTenantStorage)
throws TenantOrAppNotFoundException, StorageQueryException, InvalidConfigException, IOException,
DbInitException {
// Since all the tenants of a user must share the storage, we will just use the
// storage of the first tenantId of the first loginMethod

TenantIdentifier firstTenantIdentifier = new TenantIdentifier(appIdentifier.getConnectionUriDomain(),
appIdentifier.getAppId(), user.loginMethods.get(0).tenantIds.get(0));
try {
if (Main.isTesting && Main.isTesting_skipBulkImportUserValidationInCronJob) {
// Skip validation when the flag is enabled during testing
} else {
// Validate the user
bulkImportUserUtils.createBulkImportUserFromJSON(main, appIdentifier, user.toJsonObject(), user.id);
}

SQLStorage bulkImportProxyStorage = (SQLStorage) getProxyStorage(firstTenantIdentifier);
// Since all the tenants of a user must share the storage, we will just use the
// storage of the first tenantId of the first loginMethod

LoginMethod primaryLM = getPrimaryLoginMethod(user);
TenantIdentifier firstTenantIdentifier = new TenantIdentifier(appIdentifier.getConnectionUriDomain(),
appIdentifier.getAppId(), user.loginMethods.get(0).tenantIds.get(0));

AuthRecipeSQLStorage authRecipeSQLStorage = (AuthRecipeSQLStorage) getProxyStorage(firstTenantIdentifier);
try {
SQLStorage bulkImportProxyStorage = (SQLStorage) getProxyStorage(firstTenantIdentifier);

LoginMethod primaryLM = getPrimaryLoginMethod(user);

AuthRecipeSQLStorage authRecipeSQLStorage = (AuthRecipeSQLStorage) getProxyStorage(firstTenantIdentifier);
// If primaryUserId is not null, it means we may have already processed this user but failed to delete the entry
// If the primaryUserId exists in the database, we'll delete the corresponding entry from the bulkImportUser table and proceed to skip this user.
if (user.primaryUserId != null) {
Expand Down Expand Up @@ -259,7 +274,7 @@ private void processUser(AppIdentifier appIdentifier, BulkImportUser user, BulkI
closeAllProxyStorages();
}
});
} catch (StorageTransactionLogicException e) {
} catch (StorageTransactionLogicException | InvalidBulkImportDataException e) {
handleProcessUserExceptions(appIdentifier, user, e, baseTenantStorage);
}
}
Expand All @@ -275,6 +290,8 @@ private void handleProcessUserExceptions(AppIdentifier appIdentifier, BulkImport
if (e instanceof StorageTransactionLogicException) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should also add the error stack in the message? So the saved error can look like:
{
errorMsg: "...",
errorStack: "..."
}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a utils function somewhere in the code to get the error stack from an exception as a string

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we should provide error stack. We don't expect our users to debug the issue by looking at the code. The error message should be clear enough to understand which field has the issue.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh yea, but users may ask on support that they are getting this error and then for us, it will be way easier to know from where. This can be skipped only if the error messages are globally unique.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We decided to add error codes in the error message to make them globally unique.

StorageTransactionLogicException exception = (StorageTransactionLogicException) e;
errorMessage[0] = exception.actualException.getMessage();
} else if (e instanceof InvalidBulkImportDataException) {
errorMessage[0] = ((InvalidBulkImportDataException) e).errors.toString();
}

try {
Expand Down
1 change: 1 addition & 0 deletions src/main/java/io/supertokens/utils/SemVer.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ public class SemVer implements Comparable<SemVer> {
public static final SemVer v3_0 = new SemVer("3.0");
public static final SemVer v4_0 = new SemVer("4.0");
public static final SemVer v5_0 = new SemVer("5.0");
public static final SemVer v5_1 = new SemVer("5.1");

final private String version;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,7 @@

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;

import com.google.gson.JsonArray;
import com.google.gson.JsonObject;
Expand Down
1 change: 1 addition & 0 deletions src/test/java/io/supertokens/test/Utils.java
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ public static String getCdiVersionStringLatestForTests() {

public static void reset() {
Main.isTesting = true;
Main.isTesting_skipBulkImportUserValidationInCronJob = false;
PluginInterfaceTesting.isTesting = true;
Main.makeConsolePrintSilent = true;
String installDir = "../";
Expand Down
12 changes: 6 additions & 6 deletions src/test/java/io/supertokens/test/bulkimport/BulkImportTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ public void shouldAddUsersInBulkImportUsersTable() throws Exception {
BulkImportStorage storage = (BulkImportStorage) StorageLayer.getStorage(process.main);
BulkImport.addUsers(new AppIdentifier(null, null), storage, users);

List<BulkImportUser> addedUsers = storage.getBulkImportUsers(new AppIdentifier(null, null), null, BULK_IMPORT_USER_STATUS.NEW, null, null);
List<BulkImportUser> addedUsers = storage.getBulkImportUsers(new AppIdentifier(null, null), 100, BULK_IMPORT_USER_STATUS.NEW, null, null);

// Verify that all users are present in addedUsers
for (BulkImportUser user : users) {
Expand Down Expand Up @@ -116,7 +116,7 @@ public void shouldCreatedNewIdsIfDuplicateIdIsFound() throws Exception {
AppIdentifier appIdentifier = new AppIdentifier(null, null);
BulkImport.addUsers(appIdentifier, storage, users);

List<BulkImportUser> addedUsers = storage.getBulkImportUsers(appIdentifier, null, BULK_IMPORT_USER_STATUS.NEW, null, null);
List<BulkImportUser> addedUsers = storage.getBulkImportUsers(appIdentifier, 1000, BULK_IMPORT_USER_STATUS.NEW, null, null);

// Verify that the other properties are same but ids changed
for (BulkImportUser user : users) {
Expand Down Expand Up @@ -153,7 +153,7 @@ public void testGetUsersStatusFilter() throws Exception {
List<BulkImportUser> users = generateBulkImportUser(10);
BulkImport.addUsers(appIdentifier, storage, users);

List<BulkImportUser> addedUsers = storage.getBulkImportUsers(appIdentifier, null, BULK_IMPORT_USER_STATUS.NEW, null, null);
List<BulkImportUser> addedUsers = storage.getBulkImportUsers(appIdentifier, 100, BULK_IMPORT_USER_STATUS.NEW, null, null);
assertEquals(10, addedUsers.size());
}

Expand All @@ -171,7 +171,7 @@ public void testGetUsersStatusFilter() throws Exception {
return null;
});

List<BulkImportUser> addedUsers = storage.getBulkImportUsers(appIdentifier, null, BULK_IMPORT_USER_STATUS.PROCESSING, null, null);
List<BulkImportUser> addedUsers = storage.getBulkImportUsers(appIdentifier, 100, BULK_IMPORT_USER_STATUS.PROCESSING, null, null);
assertEquals(10, addedUsers.size());
}

Expand All @@ -189,7 +189,7 @@ public void testGetUsersStatusFilter() throws Exception {
return null;
});

List<BulkImportUser> addedUsers = storage.getBulkImportUsers(appIdentifier, null, BULK_IMPORT_USER_STATUS.FAILED, null, null);
List<BulkImportUser> addedUsers = storage.getBulkImportUsers(appIdentifier, 100, BULK_IMPORT_USER_STATUS.FAILED, null, null);
assertEquals(10, addedUsers.size());
}

Expand Down Expand Up @@ -229,7 +229,7 @@ public void randomPaginationTest() throws Exception {
}

// Get all inserted users
List<BulkImportUser> addedUsers = storage.getBulkImportUsers(new AppIdentifier(null, null), null, null, null, null);
List<BulkImportUser> addedUsers = storage.getBulkImportUsers(new AppIdentifier(null, null), 1000, null, null, null);
assertEquals(numberOfUsers, addedUsers.size());

// We are sorting the users based on createdAt and id like we do in the storage layer
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ public void shouldProcessBulkImportUsersInTheSameTenant() throws Exception {

Thread.sleep(6000);

List<BulkImportUser> usersAfterProcessing = storage.getBulkImportUsers(appIdentifier, null, null,
List<BulkImportUser> usersAfterProcessing = storage.getBulkImportUsers(appIdentifier, 100, null,
null, null);

assertEquals(0, usersAfterProcessing.size());
Expand Down Expand Up @@ -161,7 +161,7 @@ public void shouldProcessBulkImportUsersInMultipleTenantsWithDifferentStorages()

Thread.sleep(6000);

List<BulkImportUser> usersAfterProcessing = storage.getBulkImportUsers(appIdentifier, null, null,
List<BulkImportUser> usersAfterProcessing = storage.getBulkImportUsers(appIdentifier, 100, null,
null, null);

assertEquals(0, usersAfterProcessing.size());
Expand Down Expand Up @@ -191,6 +191,9 @@ public void shouldDeleteEverythingFromtheDBIfAnythingFails() throws Exception {
// Creating a non-existing user role will result in an error.
// Since, user role creation happens at the last step of the bulk import process, everything should be deleted from the DB.

// NOTE: We will also need to disable the bulk import user validation in the cron job for this test to work.
Main.isTesting_skipBulkImportUserValidationInCronJob = true;

TestingProcess process = startCronProcess();
Main main = process.getProcess();

Expand All @@ -204,7 +207,7 @@ public void shouldDeleteEverythingFromtheDBIfAnythingFails() throws Exception {

Thread.sleep(6000);

List<BulkImportUser> usersAfterProcessing = storage.getBulkImportUsers(appIdentifier, null, null,
List<BulkImportUser> usersAfterProcessing = storage.getBulkImportUsers(appIdentifier, 100, null,
null, null);

assertEquals(1, usersAfterProcessing.size());
Expand All @@ -225,18 +228,55 @@ public void shouldThrowTenantDoesNotExistError() throws Exception {
BulkImportSQLStorage storage = (BulkImportSQLStorage) StorageLayer.getStorage(main);
AppIdentifier appIdentifier = new AppIdentifier(null, null);

// Create user roles before inserting bulk users
{
UserRoles.createNewRoleOrModifyItsPermissions(main, "role1", null);
UserRoles.createNewRoleOrModifyItsPermissions(main, "role2", null);
}

List<BulkImportUser> users = generateBulkImportUser(1);
BulkImport.addUsers(appIdentifier, storage, users);

Thread.sleep(6000);

List<BulkImportUser> usersAfterProcessing = storage.getBulkImportUsers(appIdentifier, null, null,
List<BulkImportUser> usersAfterProcessing = storage.getBulkImportUsers(appIdentifier, 100, null,
null, null);

assertEquals(1, usersAfterProcessing.size());
assertEquals(BULK_IMPORT_USER_STATUS.FAILED, usersAfterProcessing.get(0).status);
assertEquals(
"[Invalid tenantId: t1 for a user role., Invalid tenantId: t1 for a user role., Invalid tenantId: t1 for emailpassword recipe., Invalid tenantId: t1 for thirdparty recipe., Invalid tenantId: t1 for passwordless recipe.]",
usersAfterProcessing.get(0).errorMessage);
}

@Test
public void shouldThrowTenantHaveDifferentStoragesError() throws Exception {
TestingProcess process = startCronProcess();
Main main = process.getProcess();

BulkImportSQLStorage storage = (BulkImportSQLStorage) StorageLayer.getStorage(main);
AppIdentifier appIdentifier = new AppIdentifier(null, null);

// Create user roles before inserting bulk users
{
UserRoles.createNewRoleOrModifyItsPermissions(main, "role1", null);
UserRoles.createNewRoleOrModifyItsPermissions(main, "role2", null);
}
createTenants(main);


List<BulkImportUser> users = generateBulkImportUser(1, List.of("t1", "t2"), 0);
BulkImport.addUsers(appIdentifier, storage, users);

Thread.sleep(6000);

List<BulkImportUser> usersAfterProcessing = storage.getBulkImportUsers(appIdentifier, 100, null,
null, null);

assertEquals(1, usersAfterProcessing.size());
assertEquals(BULK_IMPORT_USER_STATUS.FAILED, usersAfterProcessing.get(0).status);
assertEquals(
"Tenant with the following connectionURIDomain, appId and tenantId combination not found: (, public, t1)",
"[All tenants for a user must share the same storage for emailpassword recipe., All tenants for a user must share the same storage for thirdparty recipe., All tenants for a user must share the same storage for passwordless recipe.]",
sattvikc marked this conversation as resolved.
Show resolved Hide resolved
usersAfterProcessing.get(0).errorMessage);
}

Expand Down
Loading
Loading