Skip to content

Commit

Permalink
migration: backfill actor.default_version_id and mark column as non-n…
Browse files Browse the repository at this point in the history
…ull (#8502)
  • Loading branch information
pedroslopez committed Aug 24, 2023
1 parent 4a71129 commit 57a6d4a
Show file tree
Hide file tree
Showing 8 changed files with 276 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ class BootloaderTest {

// ⚠️ This line should change with every new migration to show that you meant to make a new
// migration to the prod database
private static final String CURRENT_CONFIGS_MIGRATION_VERSION = "0.50.20.001";
private static final String CURRENT_CONFIGS_MIGRATION_VERSION = "0.50.21.001";
private static final String CURRENT_JOBS_MIGRATION_VERSION = "0.50.4.001";
private static final String CDK_VERSION = "1.2.3";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

import static io.airbyte.db.instance.configs.jooq.generated.Tables.ACTOR;
import static io.airbyte.db.instance.configs.jooq.generated.Tables.ACTOR_DEFINITION;
import static io.airbyte.db.instance.configs.jooq.generated.Tables.ACTOR_DEFINITION_VERSION;
import static io.airbyte.db.instance.configs.jooq.generated.Tables.CONNECTION;
import static io.airbyte.db.instance.configs.jooq.generated.Tables.WORKSPACE;
import static io.airbyte.db.instance.jobs.jooq.generated.Tables.JOBS;
Expand All @@ -29,6 +30,8 @@ class WorkspaceFilterTest extends BaseConfigDatabaseTest {

private static final UUID SRC_DEF_ID = UUID.randomUUID();
private static final UUID DST_DEF_ID = UUID.randomUUID();
private static final UUID SRC_DEF_VER_ID = UUID.randomUUID();
private static final UUID DST_DEF_VER_ID = UUID.randomUUID();
private static final UUID ACTOR_ID_0 = UUID.randomUUID();
private static final UUID ACTOR_ID_1 = UUID.randomUUID();
private static final UUID ACTOR_ID_2 = UUID.randomUUID();
Expand All @@ -53,6 +56,12 @@ static void setUpAll() throws SQLException {
.values(DST_DEF_ID, "dstDef", ActorType.destination)
.values(UUID.randomUUID(), "dstDef", ActorType.destination)
.execute());
// create actor_definition_version
database.transaction(ctx -> ctx.insertInto(ACTOR_DEFINITION_VERSION, ACTOR_DEFINITION_VERSION.ID, ACTOR_DEFINITION_VERSION.ACTOR_DEFINITION_ID,
ACTOR_DEFINITION_VERSION.DOCKER_REPOSITORY, ACTOR_DEFINITION_VERSION.DOCKER_IMAGE_TAG, ACTOR_DEFINITION_VERSION.SPEC)
.values(SRC_DEF_VER_ID, SRC_DEF_ID, "airbyte/source", "tag", JSONB.valueOf("{}"))
.values(DST_DEF_VER_ID, DST_DEF_ID, "airbyte/destination", "tag", JSONB.valueOf("{}"))
.execute());

// create workspace
database.transaction(ctx -> ctx.insertInto(WORKSPACE, WORKSPACE.ID, WORKSPACE.NAME, WORKSPACE.SLUG, WORKSPACE.INITIAL_SETUP_COMPLETE)
Expand All @@ -63,11 +72,13 @@ static void setUpAll() throws SQLException {
.execute());
// create actors
database.transaction(
ctx -> ctx.insertInto(ACTOR, ACTOR.WORKSPACE_ID, ACTOR.ID, ACTOR.ACTOR_DEFINITION_ID, ACTOR.NAME, ACTOR.CONFIGURATION, ACTOR.ACTOR_TYPE)
.values(WORKSPACE_ID_0, ACTOR_ID_0, SRC_DEF_ID, "ACTOR-0", JSONB.valueOf("{}"), ActorType.source)
.values(WORKSPACE_ID_1, ACTOR_ID_1, SRC_DEF_ID, "ACTOR-1", JSONB.valueOf("{}"), ActorType.source)
.values(WORKSPACE_ID_2, ACTOR_ID_2, DST_DEF_ID, "ACTOR-2", JSONB.valueOf("{}"), ActorType.source)
.values(WORKSPACE_ID_3, ACTOR_ID_3, DST_DEF_ID, "ACTOR-3", JSONB.valueOf("{}"), ActorType.source)
ctx -> ctx
.insertInto(ACTOR, ACTOR.WORKSPACE_ID, ACTOR.ID, ACTOR.ACTOR_DEFINITION_ID, ACTOR.DEFAULT_VERSION_ID, ACTOR.NAME, ACTOR.CONFIGURATION,
ACTOR.ACTOR_TYPE)
.values(WORKSPACE_ID_0, ACTOR_ID_0, SRC_DEF_ID, SRC_DEF_VER_ID, "ACTOR-0", JSONB.valueOf("{}"), ActorType.source)
.values(WORKSPACE_ID_1, ACTOR_ID_1, SRC_DEF_ID, SRC_DEF_VER_ID, "ACTOR-1", JSONB.valueOf("{}"), ActorType.source)
.values(WORKSPACE_ID_2, ACTOR_ID_2, DST_DEF_ID, DST_DEF_VER_ID, "ACTOR-2", JSONB.valueOf("{}"), ActorType.source)
.values(WORKSPACE_ID_3, ACTOR_ID_3, DST_DEF_ID, DST_DEF_VER_ID, "ACTOR-3", JSONB.valueOf("{}"), ActorType.source)
.execute());
// create connections
database.transaction(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/*
* Copyright (c) 2023 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.db.instance.configs.migrations;

import com.google.common.annotations.VisibleForTesting;
import java.util.UUID;
import org.flywaydb.core.api.migration.BaseJavaMigration;
import org.flywaydb.core.api.migration.Context;
import org.jooq.DSLContext;
import org.jooq.Field;
import org.jooq.Record;
import org.jooq.Table;
import org.jooq.impl.DSL;
import org.jooq.impl.SQLDataType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Sets all actor's default_version_id to its actor_definition's default_version_id, and sets the
* column to be non-null.
*/
public class V0_50_21_001__BackfillActorDefaultVersionAndSetNonNull extends BaseJavaMigration {

private static final Logger LOGGER = LoggerFactory.getLogger(V0_50_21_001__BackfillActorDefaultVersionAndSetNonNull.class);

private static final Table<Record> ACTOR_DEFINITION = DSL.table("actor_definition");
private static final Table<Record> ACTOR = DSL.table("actor");

private static final Field<UUID> ID = DSL.field("id", SQLDataType.UUID);
private static final Field<UUID> DEFAULT_VERSION_ID = DSL.field("default_version_id", SQLDataType.UUID);
private static final Field<UUID> ACTOR_DEFINITION_ID = DSL.field("actor_definition_id", SQLDataType.UUID);

@Override
public void migrate(final Context context) throws Exception {
LOGGER.info("Running migration: {}", this.getClass().getSimpleName());

// Warning: please do not use any jOOQ generated code to write a migration.
// As database schema changes, the generated jOOQ code can be deprecated. So
// old migration may not compile if there is any generated code.
final DSLContext ctx = DSL.using(context.getConnection());
backfillActorDefaultVersionId(ctx);
setNonNull(ctx);
}

@VisibleForTesting
static void backfillActorDefaultVersionId(final DSLContext ctx) {
final var actorDefinitions = ctx.select(ID, DEFAULT_VERSION_ID)
.from(ACTOR_DEFINITION)
.fetch();

for (final var actorDefinition : actorDefinitions) {
final UUID actorDefinitionId = actorDefinition.get(ID);
final UUID defaultVersionId = actorDefinition.get(DEFAULT_VERSION_ID);

ctx.update(ACTOR)
.set(DEFAULT_VERSION_ID, defaultVersionId)
.where(ACTOR_DEFINITION_ID.eq(actorDefinitionId))
.execute();
}
}

@VisibleForTesting
static void setNonNull(final DSLContext ctx) {
ctx.alterTable(ACTOR)
.alterColumn(DEFAULT_VERSION_ID)
.setNotNull()
.execute();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ create table "public"."actor" (
"tombstone" boolean not null default false,
"created_at" timestamp(6) with time zone not null default current_timestamp,
"updated_at" timestamp(6) with time zone not null default current_timestamp,
"default_version_id" uuid,
"default_version_id" uuid not null,
constraint "actor_pkey"
primary key ("id")
);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
/*
* Copyright (c) 2023 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.db.instance.configs.migrations;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNull;

import io.airbyte.db.factory.FlywayFactory;
import io.airbyte.db.instance.configs.AbstractConfigsDatabaseTest;
import io.airbyte.db.instance.configs.ConfigsDatabaseMigrator;
import io.airbyte.db.instance.configs.migrations.V0_32_8_001__AirbyteConfigDatabaseDenormalization.ActorType;
import io.airbyte.db.instance.development.DevDatabaseMigrator;
import java.util.Objects;
import java.util.UUID;
import org.flywaydb.core.Flyway;
import org.flywaydb.core.api.migration.BaseJavaMigration;
import org.jooq.DSLContext;
import org.jooq.Field;
import org.jooq.JSONB;
import org.jooq.Record;
import org.jooq.Table;
import org.jooq.exception.DataAccessException;
import org.jooq.impl.DSL;
import org.jooq.impl.SQLDataType;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

class V0_50_21_001__BackfillActorDefaultVersionAndSetNonNullTest extends AbstractConfigsDatabaseTest {

private static final Table<Record> ACTOR = DSL.table("actor");
private static final Table<Record> ACTOR_DEFINITION = DSL.table("actor_definition");
private static final Table<Record> ACTOR_DEFINITION_VERSION = DSL.table("actor_definition_version");
private static final Table<Record> WORKSPACE = DSL.table("workspace");

private static final Field<UUID> ID_COL = DSL.field("id", SQLDataType.UUID);
private static final Field<UUID> DEFAULT_VERSION_ID_COL = DSL.field("default_version_id", SQLDataType.UUID);
private static final Field<UUID> ACTOR_DEFINITION_ID_COL = DSL.field("actor_definition_id", SQLDataType.UUID);

private static final UUID WORKSPACE_ID = UUID.randomUUID();
private static final UUID ACTOR_DEFINITION_ID = UUID.randomUUID();
private static final UUID ACTOR_ID = UUID.randomUUID();
private static final UUID VERSION_ID = UUID.randomUUID();

@BeforeEach
void beforeEach() {
final Flyway flyway =
FlywayFactory.create(dataSource, "V0_50_21_001__BackfillActorDefaultVersionAndSetNonNullTest.java", ConfigsDatabaseMigrator.DB_IDENTIFIER,
ConfigsDatabaseMigrator.MIGRATION_FILE_LOCATION);
final ConfigsDatabaseMigrator configsDbMigrator = new ConfigsDatabaseMigrator(database, flyway);

final BaseJavaMigration previousMigration = new V0_50_20_001__MakeManualNullableForRemoval();
final DevDatabaseMigrator devConfigsDbMigrator = new DevDatabaseMigrator(configsDbMigrator, previousMigration.getVersion());
devConfigsDbMigrator.createBaseline();
}

private UUID getDefaultVersionIdForActorId(final DSLContext ctx, final UUID actorId) {
final var actor = ctx.select(DEFAULT_VERSION_ID_COL)
.from(ACTOR)
.where(ID_COL.eq(actorId))
.fetchOne();

if (Objects.isNull(actor)) {
return null;
}

return actor.get(DEFAULT_VERSION_ID_COL);
}

static void insertDependencies(final DSLContext ctx) {
ctx.insertInto(WORKSPACE)
.columns(
ID_COL,
DSL.field("name"),
DSL.field("slug"),
DSL.field("initial_setup_complete"))
.values(
WORKSPACE_ID,
"name1",
"default",
true)
.execute();

ctx.insertInto(ACTOR_DEFINITION)
.columns(
ID_COL,
DSL.field("name"),
DSL.field("actor_type"))
.values(
ACTOR_DEFINITION_ID,
"source def name",
ActorType.source)
.execute();

ctx.insertInto(ACTOR_DEFINITION_VERSION)
.columns(ID_COL, ACTOR_DEFINITION_ID_COL, DSL.field("docker_repository"), DSL.field("docker_image_tag"), DSL.field("spec"))
.values(VERSION_ID, ACTOR_DEFINITION_ID, "airbyte/some-source", "1.0.0", JSONB.valueOf("{}"))
.execute();

ctx.update(ACTOR_DEFINITION)
.set(DEFAULT_VERSION_ID_COL, VERSION_ID)
.where(ID_COL.eq(ACTOR_DEFINITION_ID))
.execute();
}

@Test
void testBackFillActorDefaultVersionId() {
final DSLContext ctx = getDslContext();
insertDependencies(ctx);

ctx.insertInto(ACTOR)
.columns(
ID_COL,
ACTOR_DEFINITION_ID_COL,
DSL.field("workspace_id"),
DSL.field("name"),
DSL.field("configuration"),
DSL.field("actor_type"))
.values(
ACTOR_ID,
ACTOR_DEFINITION_ID,
WORKSPACE_ID,
"My Source",
JSONB.valueOf("{}"),
ActorType.source)
.execute();

assertNull(getDefaultVersionIdForActorId(ctx, ACTOR_ID));

V0_50_21_001__BackfillActorDefaultVersionAndSetNonNull.backfillActorDefaultVersionId(ctx);

assertEquals(VERSION_ID, getDefaultVersionIdForActorId(ctx, ACTOR_ID));
}

@Test
void testActorDefaultVersionIdIsNotNull() {
final DSLContext context = getDslContext();

V0_50_21_001__BackfillActorDefaultVersionAndSetNonNull.setNonNull(context);

final Exception e = Assertions.assertThrows(DataAccessException.class, () -> {
context.insertInto(ACTOR)
.columns(
ID_COL,
ACTOR_DEFINITION_ID_COL,
DSL.field("workspace_id"),
DSL.field("name"),
DSL.field("configuration"),
DSL.field("actor_type"))
.values(
UUID.randomUUID(),
UUID.randomUUID(),
UUID.randomUUID(),
"My Source",
JSONB.valueOf("{}"),
ActorType.source)
.execute();
});
Assertions.assertTrue(e.getMessage().contains("null value in column \"default_version_id\" of relation \"actor\" violates not-null constraint"));
}

}
2 changes: 1 addition & 1 deletion airbyte-featureflag/src/main/kotlin/FlagDefinitions.kt
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ object ShouldFailSyncIfHeartbeatFailure : Permanent<Boolean>(key = "heartbeat.fa

object ConnectorVersionOverride : Permanent<String>(key = "connectors.versionOverrides", default = "")

object UseActorScopedDefaultVersions : Temporary<Boolean>(key = "connectors.useActorScopedDefaultVersions", default = false)
object UseActorScopedDefaultVersions : Temporary<Boolean>(key = "connectors.useActorScopedDefaultVersions", default = true)

object IngestBreakingChanges : Temporary<Boolean>(key = "connectors.ingestBreakingChanges", default = true)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import static io.airbyte.db.instance.configs.jooq.generated.Tables.ACTOR;
import static io.airbyte.db.instance.configs.jooq.generated.Tables.ACTOR_CATALOG_FETCH_EVENT;
import static io.airbyte.db.instance.configs.jooq.generated.Tables.ACTOR_DEFINITION;
import static io.airbyte.db.instance.configs.jooq.generated.Tables.ACTOR_DEFINITION_VERSION;
import static io.airbyte.db.instance.configs.jooq.generated.Tables.CONNECTION;
import static io.airbyte.db.instance.configs.jooq.generated.Tables.WORKSPACE;
import static io.airbyte.db.instance.jobs.jooq.generated.Tables.ATTEMPTS;
Expand Down Expand Up @@ -58,6 +59,8 @@ class MetricRepositoryTest {

private static final UUID SRC_DEF_ID = UUID.randomUUID();
private static final UUID DST_DEF_ID = UUID.randomUUID();
private static final UUID SRC_DEF_VER_ID = UUID.randomUUID();
private static final UUID DST_DEF_VER_ID = UUID.randomUUID();
private static MetricRepository db;
private static DSLContext ctx;

Expand All @@ -80,6 +83,12 @@ public static void setUpAll() throws DatabaseInitializationException, IOExceptio
.values(UUID.randomUUID(), "dstDef", ActorType.destination)
.execute();

ctx.insertInto(ACTOR_DEFINITION_VERSION, ACTOR_DEFINITION_VERSION.ID, ACTOR_DEFINITION_VERSION.ACTOR_DEFINITION_ID,
ACTOR_DEFINITION_VERSION.DOCKER_REPOSITORY, ACTOR_DEFINITION_VERSION.DOCKER_IMAGE_TAG, ACTOR_DEFINITION_VERSION.SPEC)
.values(SRC_DEF_VER_ID, SRC_DEF_ID, "airbyte/source", "tag", JSONB.valueOf("{}"))
.values(DST_DEF_VER_ID, DST_DEF_ID, "airbyte/destination", "tag", JSONB.valueOf("{}"))
.execute();

// drop constraints to simplify test set up
ctx.alterTable(ACTOR).dropForeignKey(ACTOR__ACTOR_WORKSPACE_ID_FKEY.constraint()).execute();
ctx.alterTable(CONNECTION).dropForeignKey(CONNECTION__CONNECTION_DESTINATION_ID_FKEY.constraint()).execute();
Expand Down Expand Up @@ -318,10 +327,11 @@ void shouldReturnNumConnectionsBasic() {

final var srcId = UUID.randomUUID();
final var dstId = UUID.randomUUID();
ctx.insertInto(ACTOR, ACTOR.ID, ACTOR.WORKSPACE_ID, ACTOR.ACTOR_DEFINITION_ID, ACTOR.NAME, ACTOR.CONFIGURATION, ACTOR.ACTOR_TYPE,
ctx.insertInto(ACTOR, ACTOR.ID, ACTOR.WORKSPACE_ID, ACTOR.ACTOR_DEFINITION_ID, ACTOR.DEFAULT_VERSION_ID, ACTOR.NAME, ACTOR.CONFIGURATION,
ACTOR.ACTOR_TYPE,
ACTOR.TOMBSTONE)
.values(srcId, workspaceId, SRC_DEF_ID, SRC, JSONB.valueOf("{}"), ActorType.source, false)
.values(dstId, workspaceId, DST_DEF_ID, DEST, JSONB.valueOf("{}"), ActorType.destination, false)
.values(srcId, workspaceId, SRC_DEF_ID, SRC_DEF_VER_ID, SRC, JSONB.valueOf("{}"), ActorType.source, false)
.values(dstId, workspaceId, DST_DEF_ID, DST_DEF_VER_ID, DEST, JSONB.valueOf("{}"), ActorType.destination, false)
.execute();

ctx.insertInto(CONNECTION, CONNECTION.ID, CONNECTION.NAMESPACE_DEFINITION, CONNECTION.SOURCE_ID, CONNECTION.DESTINATION_ID,
Expand All @@ -345,10 +355,11 @@ void shouldIgnoreNonRunningConnections() {

final var srcId = UUID.randomUUID();
final var dstId = UUID.randomUUID();
ctx.insertInto(ACTOR, ACTOR.ID, ACTOR.WORKSPACE_ID, ACTOR.ACTOR_DEFINITION_ID, ACTOR.NAME, ACTOR.CONFIGURATION, ACTOR.ACTOR_TYPE,
ctx.insertInto(ACTOR, ACTOR.ID, ACTOR.WORKSPACE_ID, ACTOR.ACTOR_DEFINITION_ID, ACTOR.DEFAULT_VERSION_ID, ACTOR.NAME, ACTOR.CONFIGURATION,
ACTOR.ACTOR_TYPE,
ACTOR.TOMBSTONE)
.values(srcId, workspaceId, SRC_DEF_ID, SRC, JSONB.valueOf("{}"), ActorType.source, false)
.values(dstId, workspaceId, DST_DEF_ID, DEST, JSONB.valueOf("{}"), ActorType.destination, false)
.values(srcId, workspaceId, SRC_DEF_ID, SRC_DEF_VER_ID, SRC, JSONB.valueOf("{}"), ActorType.source, false)
.values(dstId, workspaceId, DST_DEF_ID, DST_DEF_VER_ID, DEST, JSONB.valueOf("{}"), ActorType.destination, false)
.execute();

ctx.insertInto(CONNECTION, CONNECTION.ID, CONNECTION.NAMESPACE_DEFINITION, CONNECTION.SOURCE_ID, CONNECTION.DESTINATION_ID,
Expand All @@ -374,10 +385,11 @@ void shouldIgnoreDeletedWorkspaces() {

final var srcId = UUID.randomUUID();
final var dstId = UUID.randomUUID();
ctx.insertInto(ACTOR, ACTOR.ID, ACTOR.WORKSPACE_ID, ACTOR.ACTOR_DEFINITION_ID, ACTOR.NAME, ACTOR.CONFIGURATION, ACTOR.ACTOR_TYPE,
ctx.insertInto(ACTOR, ACTOR.ID, ACTOR.WORKSPACE_ID, ACTOR.ACTOR_DEFINITION_ID, ACTOR.DEFAULT_VERSION_ID, ACTOR.NAME, ACTOR.CONFIGURATION,
ACTOR.ACTOR_TYPE,
ACTOR.TOMBSTONE)
.values(srcId, workspaceId, SRC_DEF_ID, SRC, JSONB.valueOf("{}"), ActorType.source, false)
.values(dstId, workspaceId, DST_DEF_ID, DEST, JSONB.valueOf("{}"), ActorType.destination, false)
.values(srcId, workspaceId, SRC_DEF_ID, SRC_DEF_VER_ID, SRC, JSONB.valueOf("{}"), ActorType.source, false)
.values(dstId, workspaceId, DST_DEF_ID, DST_DEF_VER_ID, DEST, JSONB.valueOf("{}"), ActorType.destination, false)
.execute();

ctx.insertInto(CONNECTION, CONNECTION.ID, CONNECTION.NAMESPACE_DEFINITION, CONNECTION.SOURCE_ID, CONNECTION.DESTINATION_ID,
Expand Down
2 changes: 0 additions & 2 deletions flags.yml
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,6 @@ flags:
serve: ""
- name: platform.add-scheduling-jitter
serve: false
- name: connectors.ingestBreakingChanges
serve: false
- name: check-replication-progress
serve: true
- name: use-new-retries
Expand Down

0 comments on commit 57a6d4a

Please sign in to comment.