Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] feat: Add queries for BulkImport #202

Closed
wants to merge 24 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
4040aa2
fix: remove db password from logs (#181)
anku255 Feb 6, 2024
13753dd
fix: Connection pool issue (#182)
sattvikc Feb 6, 2024
0377d03
adding dev-v5.0.7 tag to this commit to ensure building
rishabhpoddar Feb 6, 2024
53b83c9
fix: cicd tests (#185)
sattvikc Feb 8, 2024
f29d7c1
fix: logging test (#187)
anku255 Feb 8, 2024
5f2dc1d
adding dev-v5.0.7 tag to this commit to ensure building
rishabhpoddar Feb 8, 2024
f540b86
fix: flaky test (#188)
sattvikc Feb 8, 2024
abe5312
adding dev-v5.0.7 tag to this commit to ensure building
rishabhpoddar Feb 8, 2024
5e27c9e
fix: adds idle timeout and minimum idle configs (#184)
sattvikc Feb 9, 2024
29da546
adding dev-v5.0.7 tag to this commit to ensure building
rishabhpoddar Feb 9, 2024
8383fd6
fix: cicd (#189)
sattvikc Feb 9, 2024
54b4f9a
adding dev-v5.0.7 tag to this commit to ensure building
rishabhpoddar Feb 9, 2024
f482f55
fixes tests
rishabhpoddar Feb 9, 2024
ae8ce1a
adding dev-v5.0.7 tag to this commit to ensure building
rishabhpoddar Feb 9, 2024
15be5e8
fix: vulnerability fix (#192)
sattvikc Feb 21, 2024
79f6ef7
adding dev-v5.0.8 tag to this commit to ensure building
rishabhpoddar Feb 21, 2024
3c7c4a4
fix: dependencies (#195)
sattvikc Feb 23, 2024
404c905
adding dev-v5.0.8 tag to this commit to ensure building
rishabhpoddar Feb 23, 2024
aa783d4
fix: version update (#198)
sattvikc Feb 27, 2024
969b945
adding dev-v5.0.8 tag to this commit to ensure building
rishabhpoddar Feb 27, 2024
9ed97ab
Merge remote-tracking branch 'origin/feat/mfa' into feat/bulk-import-api
anku255 Feb 27, 2024
6eccbf6
feat: Add bulk import queries (#191)
anku255 Feb 27, 2024
7714471
fix: Use BulkImportUser instead of BulkImportUserInfo (#200)
anku255 Feb 28, 2024
2731b9b
fix: Use fromJson method instead of fromDBJson in BulkImportUser (#201)
anku255 Feb 28, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 45 additions & 1 deletion src/main/java/io/supertokens/storage/postgresql/Start.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import io.supertokens.pluginInterface.authRecipe.AuthRecipeUserInfo;
import io.supertokens.pluginInterface.authRecipe.LoginMethod;
import io.supertokens.pluginInterface.authRecipe.sqlStorage.AuthRecipeSQLStorage;
import io.supertokens.pluginInterface.bulkimport.BulkImportUser;
import io.supertokens.pluginInterface.bulkimport.sqlStorage.BulkImportSQLStorage;
import io.supertokens.pluginInterface.dashboard.DashboardSearchTags;
import io.supertokens.pluginInterface.dashboard.DashboardSessionInfo;
import io.supertokens.pluginInterface.dashboard.DashboardUser;
Expand Down Expand Up @@ -109,7 +111,7 @@ public class Start
implements SessionSQLStorage, EmailPasswordSQLStorage, EmailVerificationSQLStorage, ThirdPartySQLStorage,
JWTRecipeSQLStorage, PasswordlessSQLStorage, UserMetadataSQLStorage, UserRolesSQLStorage, UserIdMappingStorage,
UserIdMappingSQLStorage, MultitenancyStorage, MultitenancySQLStorage, DashboardSQLStorage, TOTPSQLStorage,
ActiveUsersStorage, ActiveUsersSQLStorage, AuthRecipeSQLStorage {
ActiveUsersStorage, ActiveUsersSQLStorage, AuthRecipeSQLStorage, BulkImportSQLStorage {

// these configs are protected from being modified / viewed by the dev using the SuperTokens
// SaaS. If the core is not running in SuperTokens SaaS, this array has no effect.
Expand Down Expand Up @@ -3039,4 +3041,46 @@ public int getDbActivityCount(String dbname) throws SQLException, StorageQueryEx
return -1;
});
}

@Override
public void addBulkImportUsers(AppIdentifier appIdentifier, List<BulkImportUser> users)
throws StorageQueryException,
TenantOrAppNotFoundException,
io.supertokens.pluginInterface.bulkimport.exceptions.DuplicateUserIdException {
try {
BulkImportQueries.insertBulkImportUsers(this, appIdentifier, users);
} catch (SQLException e) {
if (e instanceof PSQLException) {
ServerErrorMessage serverErrorMessage = ((PSQLException) e).getServerErrorMessage();
if (isPrimaryKeyError(serverErrorMessage, Config.getConfig(this).getBulkImportUsersTable())) {
throw new io.supertokens.pluginInterface.bulkimport.exceptions.DuplicateUserIdException();
}
if (isForeignKeyConstraintError(serverErrorMessage, Config.getConfig(this).getBulkImportUsersTable(), "app_id")) {
throw new TenantOrAppNotFoundException(appIdentifier);
}
}
throw new StorageQueryException(e);
}
}

@Override
public List<BulkImportUser> getBulkImportUsers(AppIdentifier appIdentifier, @Nonnull Integer limit, @Nullable BULK_IMPORT_USER_STATUS status,
@Nullable String bulkImportUserId, @Nullable Long createdAt) throws StorageQueryException {
try {
return BulkImportQueries.getBulkImportUsers(this, appIdentifier, limit, status, bulkImportUserId, createdAt);
} catch (SQLException e) {
throw new StorageQueryException(e);
}
}

@Override
public void updateBulkImportUserStatus_Transaction(AppIdentifier appIdentifier, TransactionConnection con, @Nonnull String[] bulkImportUserIds, @Nonnull BULK_IMPORT_USER_STATUS status)
throws StorageQueryException {
Connection sqlCon = (Connection) con.getConnection();
try {
BulkImportQueries.updateBulkImportUserStatus_Transaction(this, sqlCon, appIdentifier, bulkImportUserIds, status);
} catch (SQLException e) {
throw new StorageQueryException(e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -326,6 +326,10 @@ public String getTotpUsedCodesTable() {
return addSchemaAndPrefixToTableName("totp_used_codes");
}

public String getBulkImportUsersTable() {
return addSchemaAndPrefixToTableName("bulk_import_users");
}

private String addSchemaAndPrefixToTableName(String tableName) {
return addSchemaToTableName(postgresql_table_names_prefix + tableName);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,188 @@
/*
* Copyright (c) 2024, VRAI Labs and/or its affiliates. All rights reserved.
*
* This software is licensed under the Apache License, Version 2.0 (the
* "License") as published by the Apache Software Foundation.
*
* You may not use this file except in compliance with the License. You may
* obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/

package io.supertokens.storage.postgresql.queries;

import static io.supertokens.storage.postgresql.QueryExecutorTemplate.update;
import static io.supertokens.storage.postgresql.QueryExecutorTemplate.execute;

import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;

import javax.annotation.Nonnull;
import javax.annotation.Nullable;

import io.supertokens.pluginInterface.RowMapper;
import io.supertokens.pluginInterface.bulkimport.BulkImportStorage.BULK_IMPORT_USER_STATUS;
import io.supertokens.pluginInterface.bulkimport.BulkImportUser;
import io.supertokens.pluginInterface.exceptions.StorageQueryException;
import io.supertokens.pluginInterface.multitenancy.AppIdentifier;
import io.supertokens.storage.postgresql.Start;
import io.supertokens.storage.postgresql.config.Config;
import io.supertokens.storage.postgresql.utils.Utils;

public class BulkImportQueries {
static String getQueryToCreateBulkImportUsersTable(Start start) {
String schema = Config.getConfig(start).getTableSchema();
String tableName = Config.getConfig(start).getBulkImportUsersTable();
return "CREATE TABLE IF NOT EXISTS " + tableName + " ("
+ "id CHAR(36),"
+ "app_id VARCHAR(64) NOT NULL DEFAULT 'public',"
+ "raw_data TEXT NOT NULL,"
+ "status VARCHAR(128) DEFAULT 'NEW',"
+ "error_msg TEXT,"
+ "created_at BIGINT DEFAULT EXTRACT(EPOCH FROM CURRENT_TIMESTAMP),"
+ "updated_at BIGINT DEFAULT EXTRACT(EPOCH FROM CURRENT_TIMESTAMP),"
+ "CONSTRAINT " + Utils.getConstraintName(schema, tableName, null, "pkey")
+ " PRIMARY KEY(app_id, id),"
+ "CONSTRAINT " + Utils.getConstraintName(schema, tableName, "app_id", "fkey") + " "
+ "FOREIGN KEY(app_id) "
+ "REFERENCES " + Config.getConfig(start).getAppsTable() + " (app_id) ON DELETE CASCADE"
+ " );";
}

public static String getQueryToCreateStatusUpdatedAtIndex(Start start) {
return "CREATE INDEX IF NOT EXISTS bulk_import_users_status_updated_at_index ON "
+ Config.getConfig(start).getBulkImportUsersTable() + " (app_id, status, updated_at)";
}

public static String getQueryToCreateCreatedAtIndex(Start start) {
return "CREATE INDEX IF NOT EXISTS bulk_import_users_created_at_index ON "
+ Config.getConfig(start).getBulkImportUsersTable() + " (app_id, created_at)";
}

public static void insertBulkImportUsers(Start start, AppIdentifier appIdentifier, List<BulkImportUser> users)
throws SQLException, StorageQueryException {
StringBuilder queryBuilder = new StringBuilder(
"INSERT INTO " + Config.getConfig(start).getBulkImportUsersTable() + " (id, app_id, raw_data) VALUES ");

int userCount = users.size();

for (int i = 0; i < userCount; i++) {
queryBuilder.append(" (?, ?, ?)");

if (i < userCount - 1) {
queryBuilder.append(",");
}
}

update(start, queryBuilder.toString(), pst -> {
int parameterIndex = 1;
for (BulkImportUser user : users) {
pst.setString(parameterIndex++, user.id);
pst.setString(parameterIndex++, appIdentifier.getAppId());
pst.setString(parameterIndex++, user.toRawDataForDbStorage());
}
});
}

public static void updateBulkImportUserStatus_Transaction(Start start, Connection con, AppIdentifier appIdentifier, @Nonnull String[] bulkImportUserIds, @Nonnull BULK_IMPORT_USER_STATUS status)
throws SQLException, StorageQueryException {
if (bulkImportUserIds.length == 0) {
return;
}

String baseQuery = "UPDATE " + Config.getConfig(start).getBulkImportUsersTable() + " SET status = ?, updated_at = EXTRACT(EPOCH FROM CURRENT_TIMESTAMP) WHERE app_id = ?";
StringBuilder queryBuilder = new StringBuilder(baseQuery);

List<Object> parameters = new ArrayList<>();

parameters.add(status.toString());
parameters.add(appIdentifier.getAppId());

queryBuilder.append(" AND id IN (");
for (int i = 0; i < bulkImportUserIds.length; i++) {
if (i != 0) {
queryBuilder.append(", ");
}
queryBuilder.append("?");
parameters.add(bulkImportUserIds[i]);
}
queryBuilder.append(")");

String query = queryBuilder.toString();

update(con, query, pst -> {
for (int i = 0; i < parameters.size(); i++) {
pst.setObject(i + 1, parameters.get(i));
}
});
}

public static List<BulkImportUser> getBulkImportUsers(Start start, AppIdentifier appIdentifier, @Nonnull Integer limit, @Nullable BULK_IMPORT_USER_STATUS status,
@Nullable String bulkImportUserId, @Nullable Long createdAt)
throws SQLException, StorageQueryException {

String baseQuery = "SELECT * FROM " + Config.getConfig(start).getBulkImportUsersTable();

StringBuilder queryBuilder = new StringBuilder(baseQuery);
List<Object> parameters = new ArrayList<>();

queryBuilder.append(" WHERE app_id = ?");
parameters.add(appIdentifier.getAppId());

if (status != null) {
queryBuilder.append(" AND status = ?");
parameters.add(status.toString());
}

if (bulkImportUserId != null && createdAt != null) {
queryBuilder
.append(" AND created_at < ? OR (created_at = ? AND id <= ?)");
parameters.add(createdAt);
parameters.add(createdAt);
parameters.add(bulkImportUserId);
}

queryBuilder.append(" ORDER BY created_at DESC, id DESC LIMIT ?");
parameters.add(limit);

String query = queryBuilder.toString();

return execute(start, query, pst -> {
for (int i = 0; i < parameters.size(); i++) {
pst.setObject(i + 1, parameters.get(i));
}
}, result -> {
List<BulkImportUser> bulkImportUsers = new ArrayList<>();
while (result.next()) {
bulkImportUsers.add(BulkImportUserRowMapper.getInstance().mapOrThrow(result));
}
return bulkImportUsers;
});
}

private static class BulkImportUserRowMapper implements RowMapper<BulkImportUser, ResultSet> {
private static final BulkImportUserRowMapper INSTANCE = new BulkImportUserRowMapper();

private BulkImportUserRowMapper() {
}

private static BulkImportUserRowMapper getInstance() {
return INSTANCE;
}

@Override
public BulkImportUser map(ResultSet result) throws Exception {
return BulkImportUser.fromRawDataFromDbStorage(result.getString("id"), result.getString("raw_data"),
BULK_IMPORT_USER_STATUS.valueOf(result.getString("status")),
result.getLong("created_at"), result.getLong("updated_at"));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -541,6 +541,14 @@ public static void createTablesIfNotExists(Start start) throws SQLException, Sto
update(start, TOTPQueries.getQueryToCreateTenantIdIndexForUsedCodesTable(start), NO_OP_SETTER);
}

if (!doesTableExists(start, Config.getConfig(start).getBulkImportUsersTable())) {
getInstance(start).addState(CREATING_NEW_TABLE, null);
update(start, BulkImportQueries.getQueryToCreateBulkImportUsersTable(start), NO_OP_SETTER);
// index:
update(start, BulkImportQueries.getQueryToCreateStatusUpdatedAtIndex(start), NO_OP_SETTER);
update(start, BulkImportQueries.getQueryToCreateCreatedAtIndex(start), NO_OP_SETTER);
}

} catch (Exception e) {
if (e.getMessage().contains("schema") && e.getMessage().contains("does not exist")
&& numberOfRetries < 1) {
Expand Down Expand Up @@ -576,7 +584,14 @@ public static void deleteAllTables(Start start) throws SQLException, StorageQuer
String DROP_QUERY = "DROP INDEX IF EXISTS all_auth_recipe_users_pagination_index";
update(start, DROP_QUERY, NO_OP_SETTER);
}

{
String DROP_QUERY = "DROP INDEX IF EXISTS bulk_import_users_status_updated_at_index";
update(start, DROP_QUERY, NO_OP_SETTER);
}
{
String DROP_QUERY = "DROP INDEX IF EXISTS bulk_import_users_created_at_index";
update(start, DROP_QUERY, NO_OP_SETTER);
}
{
String DROP_QUERY = "DROP TABLE IF EXISTS "
+ getConfig(start).getAppsTable() + ","
Expand Down Expand Up @@ -613,7 +628,8 @@ public static void deleteAllTables(Start start) throws SQLException, StorageQuer
+ getConfig(start).getDashboardSessionsTable() + ","
+ getConfig(start).getTotpUsedCodesTable() + ","
+ getConfig(start).getTotpUserDevicesTable() + ","
+ getConfig(start).getTotpUsersTable();
+ getConfig(start).getTotpUsersTable() + ","
+ getConfig(start).getBulkImportUsersTable();
update(start, DROP_QUERY, NO_OP_SETTER);
}
}
Expand Down
Loading