Skip to content

Commit

Permalink
Narrow span of tree lock
Browse files Browse the repository at this point in the history
  • Loading branch information
yuqi1129 committed Feb 7, 2025
1 parent 8f0534b commit 45eaf79
Show file tree
Hide file tree
Showing 8 changed files with 196 additions and 206 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -130,56 +130,53 @@ public void setOwner(

public Optional<Owner> getOwner(String metalake, MetadataObject metadataObject) {
NameIdentifier ident = MetadataObjectUtil.toEntityIdent(metalake, metadataObject);
OwnerImpl owner = new OwnerImpl();
try {
List<? extends Entity> entities =
TreeLockUtils.doWithTreeLock(
ident,
LockType.READ,
() ->
store
.relationOperations()
.listEntitiesByRelation(
SupportsRelationOperations.Type.OWNER_REL,
ident,
MetadataObjectUtil.toEntityType(metadataObject)));

if (entities.isEmpty()) {
return Optional.empty();
}

if (entities.size() != 1) {
throw new IllegalStateException(
String.format("The number of the owner %s must be 1", metadataObject.fullName()));
}

Entity entity = entities.get(0);
if (!(entity instanceof UserEntity) && !(entity instanceof GroupEntity)) {
throw new IllegalArgumentException(
String.format(
"Doesn't support owner entity class %s", entities.get(0).getClass().getName()));
}

return TreeLockUtils.doWithTreeLock(
ident,
LockType.READ,
() -> {
try {
OwnerImpl owner = new OwnerImpl();
List<? extends Entity> entities =
store
.relationOperations()
.listEntitiesByRelation(
SupportsRelationOperations.Type.OWNER_REL,
ident,
MetadataObjectUtil.toEntityType(metadataObject));

if (entities.isEmpty()) {
return Optional.empty();
}

if (entities.size() != 1) {
throw new IllegalStateException(
String.format("The number of the owner %s must be 1", metadataObject.fullName()));
}

Entity entity = entities.get(0);
if (!(entity instanceof UserEntity) && !(entity instanceof GroupEntity)) {
throw new IllegalArgumentException(
String.format(
"Doesn't support owner entity class %s",
entities.get(0).getClass().getName()));
}

if (entities.get(0) instanceof UserEntity) {
UserEntity user = (UserEntity) entities.get(0);
owner.name = user.name();
owner.type = Owner.Type.USER;
} else if (entities.get(0) instanceof GroupEntity) {
GroupEntity group = (GroupEntity) entities.get(0);
owner.name = group.name();
owner.type = Owner.Type.GROUP;
}
return Optional.of(owner);
} catch (NoSuchEntityException nse) {
throw new NoSuchMetadataObjectException(
"The metadata object of %s isn't found", metadataObject.fullName());
} catch (IOException ioe) {
LOG.info("Fail to get the owner of entity {}", metadataObject.fullName(), ioe);
throw new RuntimeException(ioe);
}
});
if (entities.get(0) instanceof UserEntity) {
UserEntity user = (UserEntity) entities.get(0);
owner.name = user.name();
owner.type = Owner.Type.USER;
} else if (entities.get(0) instanceof GroupEntity) {
GroupEntity group = (GroupEntity) entities.get(0);
owner.name = group.name();
owner.type = Owner.Type.GROUP;
}
return Optional.of(owner);
} catch (NoSuchEntityException nse) {
throw new NoSuchMetadataObjectException(
"The metadata object of %s isn't found", metadataObject.fullName());
} catch (IOException ioe) {
LOG.info("Fail to get the owner of entity {}", metadataObject.fullName(), ioe);
throw new RuntimeException(ioe);
}
}

private static class OwnerImpl implements Owner {
Expand Down
82 changes: 39 additions & 43 deletions core/src/main/java/org/apache/gravitino/catalog/CatalogManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -340,24 +340,22 @@ public NameIdentifier[] listCatalogs(Namespace namespace) throws NoSuchMetalakeE
@Override
public Catalog[] listCatalogsInfo(Namespace namespace) throws NoSuchMetalakeException {
NameIdentifier metalakeIdent = NameIdentifier.of(namespace.levels());
return TreeLockUtils.doWithTreeLock(
metalakeIdent,
LockType.READ,
() -> {
checkMetalake(metalakeIdent, store);
try {
List<CatalogEntity> catalogEntities =
store.list(namespace, CatalogEntity.class, EntityType.CATALOG);

return catalogEntities.stream()
.map(e -> e.toCatalogInfoWithResolvedProps(getResolvedProperties(e)))
.toArray(Catalog[]::new);

} catch (IOException ioe) {
LOG.error("Failed to list catalogs in metalake {}", metalakeIdent, ioe);
throw new RuntimeException(ioe);
}
});
try {
List<CatalogEntity> catalogEntities =
TreeLockUtils.doWithTreeLock(
metalakeIdent,
LockType.READ,
() -> {
checkMetalake(metalakeIdent, store);
return store.list(namespace, CatalogEntity.class, EntityType.CATALOG);
});
return catalogEntities.stream()
.map(e -> e.toCatalogInfoWithResolvedProps(getResolvedProperties(e)))
.toArray(Catalog[]::new);
} catch (IOException ioe) {
LOG.error("Failed to list catalogs in metalake {}", metalakeIdent, ioe);
throw new RuntimeException(ioe);
}
}

/**
Expand All @@ -370,7 +368,6 @@ public Catalog[] listCatalogsInfo(Namespace namespace) throws NoSuchMetalakeExce
@Override
public Catalog loadCatalog(NameIdentifier ident) throws NoSuchCatalogException {
NameIdentifier metalakeIdent = NameIdentifier.of(ident.namespace().levels());

return TreeLockUtils.doWithTreeLock(
ident,
LockType.READ,
Expand Down Expand Up @@ -402,35 +399,34 @@ public Catalog createCatalog(
throws NoSuchMetalakeException, CatalogAlreadyExistsException {
NameIdentifier metalakeIdent = NameIdentifier.of(ident.namespace().levels());

Map<String, String> mergedConfig = buildCatalogConf(provider, properties);
long uid = idGenerator.nextId();
StringIdentifier stringId = StringIdentifier.fromId(uid);
Instant now = Instant.now();
String creator = PrincipalUtils.getCurrentPrincipal().getName();
CatalogEntity e =
CatalogEntity.builder()
.withId(uid)
.withName(ident.name())
.withNamespace(ident.namespace())
.withType(type)
.withProvider(provider)
.withComment(comment)
.withProperties(StringIdentifier.newPropertiesWithId(stringId, mergedConfig))
.withAuditInfo(
AuditInfo.builder()
.withCreator(creator)
.withCreateTime(now)
.withLastModifier(creator)
.withLastModifiedTime(now)
.build())
.build();

return TreeLockUtils.doWithTreeLock(
metalakeIdent,
LockType.WRITE,
() -> {
checkMetalake(metalakeIdent, store);
Map<String, String> mergedConfig = buildCatalogConf(provider, properties);

long uid = idGenerator.nextId();
StringIdentifier stringId = StringIdentifier.fromId(uid);
Instant now = Instant.now();
String creator = PrincipalUtils.getCurrentPrincipal().getName();
CatalogEntity e =
CatalogEntity.builder()
.withId(uid)
.withName(ident.name())
.withNamespace(ident.namespace())
.withType(type)
.withProvider(provider)
.withComment(comment)
.withProperties(StringIdentifier.newPropertiesWithId(stringId, mergedConfig))
.withAuditInfo(
AuditInfo.builder()
.withCreator(creator)
.withCreateTime(now)
.withLastModifier(creator)
.withLastModifiedTime(now)
.build())
.build();

boolean needClean = true;
try {
store.put(e, false /* overwrite */);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,23 +79,22 @@ public NameIdentifier[] listFilesets(Namespace namespace) throws NoSuchSchemaExc
@Override
public Fileset loadFileset(NameIdentifier ident) throws NoSuchFilesetException {
NameIdentifier catalogIdent = getCatalogIdentifier(ident);
return TreeLockUtils.doWithTreeLock(
ident,
LockType.READ,
() -> {
Fileset fileset =
doWithCatalog(
catalogIdent,
c -> c.doWithFilesetOps(f -> f.loadFileset(ident)),
NoSuchFilesetException.class);
// Currently we only support maintaining the Fileset in the Gravitino's store.
return EntityCombinedFileset.of(fileset)
.withHiddenProperties(
getHiddenPropertyNames(
catalogIdent,
HasPropertyMetadata::filesetPropertiesMetadata,
fileset.properties()));
});
Fileset fileset =
TreeLockUtils.doWithTreeLock(
ident,
LockType.READ,
() ->
doWithCatalog(
catalogIdent,
c -> c.doWithFilesetOps(f -> f.loadFileset(ident)),
NoSuchFilesetException.class));
// Currently we only support maintaining the Fileset in the Gravitino's store.
return EntityCombinedFileset.of(fileset)
.withHiddenProperties(
getHiddenPropertyNames(
catalogIdent,
HasPropertyMetadata::filesetPropertiesMetadata,
fileset.properties()));
}

/**
Expand Down Expand Up @@ -124,42 +123,40 @@ public Fileset createFileset(
Map<String, String> properties)
throws NoSuchSchemaException, FilesetAlreadyExistsException {
NameIdentifier catalogIdent = getCatalogIdentifier(ident);
doWithCatalog(
catalogIdent,
c ->
c.doWithPropertiesMeta(
p -> {
validatePropertyForCreate(p.filesetPropertiesMetadata(), properties);
return null;
}),
IllegalArgumentException.class);
long uid = idGenerator.nextId();
StringIdentifier stringId = StringIdentifier.fromId(uid);
Map<String, String> updatedProperties =
StringIdentifier.newPropertiesWithId(stringId, properties);

return TreeLockUtils.doWithTreeLock(
NameIdentifier.of(ident.namespace().levels()),
LockType.WRITE,
() -> {
doWithCatalog(
catalogIdent,
c ->
c.doWithPropertiesMeta(
p -> {
validatePropertyForCreate(p.filesetPropertiesMetadata(), properties);
return null;
}),
IllegalArgumentException.class);
long uid = idGenerator.nextId();
StringIdentifier stringId = StringIdentifier.fromId(uid);
Map<String, String> updatedProperties =
StringIdentifier.newPropertiesWithId(stringId, properties);

Fileset createdFileset =
doWithCatalog(
catalogIdent,
c ->
c.doWithFilesetOps(
f ->
f.createFileset(
ident, comment, type, storageLocation, updatedProperties)),
NoSuchSchemaException.class,
FilesetAlreadyExistsException.class);
return EntityCombinedFileset.of(createdFileset)
.withHiddenProperties(
getHiddenPropertyNames(
catalogIdent,
HasPropertyMetadata::filesetPropertiesMetadata,
createdFileset.properties()));
});
Fileset createdFileset =
TreeLockUtils.doWithTreeLock(
NameIdentifier.of(ident.namespace().levels()),
LockType.WRITE,
() ->
doWithCatalog(
catalogIdent,
c ->
c.doWithFilesetOps(
f ->
f.createFileset(
ident, comment, type, storageLocation, updatedProperties)),
NoSuchSchemaException.class,
FilesetAlreadyExistsException.class));
return EntityCombinedFileset.of(createdFileset)
.withHiddenProperties(
getHiddenPropertyNames(
catalogIdent,
HasPropertyMetadata::filesetPropertiesMetadata,
createdFileset.properties()));
}

/**
Expand All @@ -180,26 +177,26 @@ public Fileset createFileset(
@Override
public Fileset alterFileset(NameIdentifier ident, FilesetChange... changes)
throws NoSuchFilesetException, IllegalArgumentException {
return TreeLockUtils.doWithTreeLock(
NameIdentifier.of(ident.namespace().levels()),
LockType.WRITE,
() -> {
validateAlterProperties(ident, HasPropertyMetadata::filesetPropertiesMetadata, changes);
validateAlterProperties(ident, HasPropertyMetadata::filesetPropertiesMetadata, changes);
NameIdentifier catalogIdent = getCatalogIdentifier(ident);

Fileset alteredFileset =
TreeLockUtils.doWithTreeLock(
NameIdentifier.of(ident.namespace().levels()),
LockType.WRITE,
() ->
doWithCatalog(
catalogIdent,
c -> c.doWithFilesetOps(f -> f.alterFileset(ident, changes)),
NoSuchFilesetException.class,
IllegalArgumentException.class));

NameIdentifier catalogIdent = getCatalogIdentifier(ident);
Fileset alteredFileset =
doWithCatalog(
catalogIdent,
c -> c.doWithFilesetOps(f -> f.alterFileset(ident, changes)),
NoSuchFilesetException.class,
IllegalArgumentException.class);
return EntityCombinedFileset.of(alteredFileset)
.withHiddenProperties(
getHiddenPropertyNames(
catalogIdent,
HasPropertyMetadata::filesetPropertiesMetadata,
alteredFileset.properties()));
});
return EntityCombinedFileset.of(alteredFileset)
.withHiddenProperties(
getHiddenPropertyNames(
catalogIdent,
HasPropertyMetadata::filesetPropertiesMetadata,
alteredFileset.properties()));
}

/**
Expand Down
Loading

0 comments on commit 45eaf79

Please sign in to comment.