Skip to content

Commit 21c85f4

Browse files
fix(QTDI-679): schema collision
* fix when the names of the same length the values were mixed * fix when non-raw name is first that will be collided with others * fix order property * fix schemaimpl * refactor sanitization logic * add tests
1 parent 7400c05 commit 21c85f4

File tree

34 files changed

+850
-362
lines changed

34 files changed

+850
-362
lines changed

component-api/src/main/java/org/talend/sdk/component/api/record/Schema.java

Lines changed: 7 additions & 83 deletions
Original file line numberDiff line numberDiff line change
@@ -17,20 +17,14 @@
1717

1818
import java.io.StringReader;
1919
import java.math.BigDecimal;
20-
import java.nio.charset.Charset;
21-
import java.nio.charset.CharsetEncoder;
22-
import java.nio.charset.StandardCharsets;
2320
import java.time.temporal.Temporal;
2421
import java.util.Arrays;
25-
import java.util.Base64;
2622
import java.util.Collection;
2723
import java.util.Comparator;
2824
import java.util.Date;
2925
import java.util.HashMap;
3026
import java.util.List;
3127
import java.util.Map;
32-
import java.util.Objects;
33-
import java.util.Optional;
3428
import java.util.concurrent.atomic.AtomicInteger;
3529
import java.util.function.BiConsumer;
3630
import java.util.function.Function;
@@ -494,53 +488,9 @@ default Schema build(Comparator<Entry> order) {
494488
*
495489
* @return avro compatible name.
496490
*/
491+
@Deprecated
497492
static String sanitizeConnectionName(final String name) {
498-
if (SKIP_SANITIZE || name == null || name.isEmpty()) {
499-
return name;
500-
}
501-
502-
char current = name.charAt(0);
503-
final CharsetEncoder ascii = Charset.forName(StandardCharsets.US_ASCII.name()).newEncoder();
504-
final boolean skipFirstChar = ((!ascii.canEncode(current)) || (!Character.isLetter(current) && current != '_'))
505-
&& name.length() > 1 && (!Character.isDigit(name.charAt(1)));
506-
507-
final StringBuilder sanitizedBuilder = new StringBuilder();
508-
509-
if (!skipFirstChar) {
510-
if (((!Character.isLetter(current)) && current != '_') || (!ascii.canEncode(current))) {
511-
sanitizedBuilder.append('_');
512-
} else {
513-
sanitizedBuilder.append(current);
514-
}
515-
}
516-
for (int i = 1; i < name.length(); i++) {
517-
current = name.charAt(i);
518-
if (!ascii.canEncode(current)) {
519-
if (Character.isLowerCase(current) || Character.isUpperCase(current)) {
520-
sanitizedBuilder.append('_');
521-
} else {
522-
final byte[] encoded =
523-
Base64.getEncoder().encode(name.substring(i, i + 1).getBytes(StandardCharsets.UTF_8));
524-
final String enc = new String(encoded);
525-
if (sanitizedBuilder.length() == 0 && Character.isDigit(enc.charAt(0))) {
526-
sanitizedBuilder.append('_');
527-
}
528-
for (int iter = 0; iter < enc.length(); iter++) {
529-
if (Character.isLetterOrDigit(enc.charAt(iter))) {
530-
sanitizedBuilder.append(enc.charAt(iter));
531-
} else {
532-
sanitizedBuilder.append('_');
533-
}
534-
}
535-
}
536-
} else if (Character.isLetterOrDigit(current)) {
537-
sanitizedBuilder.append(current);
538-
} else {
539-
sanitizedBuilder.append('_');
540-
}
541-
542-
}
543-
return sanitizedBuilder.toString();
493+
return SchemaCompanionUtil.sanitizeConnectionName(name);
544494
}
545495

546496
@RequiredArgsConstructor
@@ -679,39 +629,13 @@ public int compare(final Entry e1, final Entry e2) {
679629
}
680630
}
681631

632+
/**
633+
* Use instead {@since SchemaCompanionUtil#avoidCollision(Schema.Entry, Function, BiConsumer)}
634+
*/
635+
@Deprecated
682636
static Schema.Entry avoidCollision(final Schema.Entry newEntry,
683637
final Function<String, Entry> entryGetter,
684638
final BiConsumer<String, Entry> replaceFunction) {
685-
if (SKIP_SANITIZE) {
686-
return newEntry;
687-
}
688-
final Optional<Entry> collisionedEntry = Optional.ofNullable(entryGetter //
689-
.apply(newEntry.getName())) //
690-
.filter((final Entry field) -> !Objects.equals(field, newEntry));
691-
if (!collisionedEntry.isPresent()) {
692-
// No collision, return new entry.
693-
return newEntry;
694-
}
695-
final Entry matchedEntry = collisionedEntry.get();
696-
final boolean matchedToChange = matchedEntry.getRawName() != null && !(matchedEntry.getRawName().isEmpty());
697-
if (matchedToChange) {
698-
// the rename has to be applied on entry already inside schema, so replace.
699-
replaceFunction.accept(matchedEntry.getName(), newEntry);
700-
} else if (newEntry.getRawName() == null || newEntry.getRawName().isEmpty()) {
701-
// try to add exactly same raw, skip the add here.
702-
return null;
703-
}
704-
final Entry fieldToChange = matchedToChange ? matchedEntry : newEntry;
705-
int indexForAnticollision = 1;
706-
final String baseName = Schema.sanitizeConnectionName(fieldToChange.getRawName()); // recalc primiti name.
707-
708-
String newName = baseName + "_" + indexForAnticollision;
709-
while (entryGetter.apply(newName) != null) {
710-
indexForAnticollision++;
711-
newName = baseName + "_" + indexForAnticollision;
712-
}
713-
final Entry newFieldToAdd = fieldToChange.toBuilder().withName(newName).build();
714-
715-
return newFieldToAdd; // matchedToChange ? newFieldToAdd : newEntry;
639+
return SchemaCompanionUtil.avoidCollision(newEntry, entryGetter, replaceFunction);
716640
}
717641
}
Lines changed: 165 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,165 @@
1+
/**
2+
* Copyright (C) 2006-2025 Talend Inc. - www.talend.com
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package org.talend.sdk.component.api.record;
17+
18+
import java.nio.charset.CharsetEncoder;
19+
import java.nio.charset.StandardCharsets;
20+
import java.util.Base64;
21+
import java.util.Objects;
22+
import java.util.Optional;
23+
import java.util.function.BiConsumer;
24+
import java.util.function.Function;
25+
26+
import org.talend.sdk.component.api.record.Schema.Entry;
27+
28+
import lombok.AccessLevel;
29+
import lombok.NoArgsConstructor;
30+
31+
@NoArgsConstructor(access = AccessLevel.PRIVATE)
32+
public class SchemaCompanionUtil {
33+
34+
/**
35+
* Sanitize name to be avro compatible.
36+
*
37+
* @param name : original name.
38+
*
39+
* @return avro compatible name.
40+
*/
41+
public static String sanitizeConnectionName(final String name) {
42+
if (Schema.SKIP_SANITIZE || name == null || name.isEmpty()) {
43+
return name;
44+
}
45+
46+
final CharsetEncoder ascii = StandardCharsets.US_ASCII.newEncoder();
47+
final StringBuilder sanitizedBuilder = new StringBuilder();
48+
final char firstLetter = sanitizeFirstLetter(name, ascii);
49+
if (firstLetter != (char) -1) {
50+
sanitizedBuilder.append(firstLetter);
51+
}
52+
53+
for (int i = 1; i < name.length(); i++) {
54+
char current = name.charAt(i);
55+
if (ascii.canEncode(current)) {
56+
sanitizedBuilder.append(Character.isLetterOrDigit(current) ? current : '_');
57+
} else {
58+
if (Character.isLowerCase(current) || Character.isUpperCase(current)) {
59+
sanitizedBuilder.append('_');
60+
} else {
61+
final byte[] encoded = base64(name.substring(i, i + 1));
62+
final String enc = new String(encoded, StandardCharsets.UTF_8);
63+
if (sanitizedBuilder.length() == 0 && Character.isDigit(enc.charAt(0))) {
64+
sanitizedBuilder.append('_');
65+
}
66+
67+
for (int iter = 0; iter < enc.length(); iter++) {
68+
final char encodedCurrentChar = enc.charAt(iter);
69+
final char sanitizedLetter = Character.isLetterOrDigit(encodedCurrentChar)
70+
? encodedCurrentChar
71+
: '_';
72+
sanitizedBuilder.append(sanitizedLetter);
73+
}
74+
}
75+
}
76+
77+
}
78+
return sanitizedBuilder.toString();
79+
}
80+
81+
private static byte[] base64(final String value) {
82+
return Base64.getEncoder().encode(value.getBytes(StandardCharsets.UTF_8));
83+
}
84+
85+
private static char sanitizeFirstLetter(final String name, final CharsetEncoder ascii) {
86+
char current = name.charAt(0);
87+
final boolean skipFirstChar = !(ascii.canEncode(current) && validFirstLetter(current))
88+
&& name.length() > 1 && !Character.isDigit(name.charAt(1));
89+
90+
// indicates that first letter is not valid, so it has to be skipped.
91+
// and because the next letter is valid (or can be sanitized) we can use it as first letter.
92+
if (skipFirstChar) {
93+
return (char) -1;
94+
}
95+
96+
if (validFirstLetter(current) && ascii.canEncode(current)) {
97+
return current;
98+
} else {
99+
return '_';
100+
}
101+
}
102+
103+
private static boolean validFirstLetter(final char value) {
104+
return Character.isLetter(value) || value == '_';
105+
}
106+
107+
/**
108+
* May return a different entry with different name.
109+
*/
110+
public static Schema.Entry avoidCollision(final Schema.Entry newEntry,
111+
final Function<String, Entry> entryGetter,
112+
final BiConsumer<String, Entry> replaceFunction) {
113+
if (Schema.SKIP_SANITIZE) {
114+
return newEntry;
115+
}
116+
117+
final Entry alreadyExistedEntry = findCollidedEntry(newEntry, entryGetter);
118+
if (alreadyExistedEntry == null) {
119+
// No collision, return new entry.
120+
return newEntry;
121+
}
122+
123+
final boolean matchedToChange = !isEmpty(alreadyExistedEntry.getRawName());
124+
if (matchedToChange) {
125+
// the rename has to be applied on entry already inside schema, so replace. (dunno why)
126+
// replace existed entry with a new name
127+
final String newSanitizedName = newNotCollidedName(entryGetter, alreadyExistedEntry.getRawName());
128+
final Entry updatedExistedEntry = alreadyExistedEntry.toBuilder()
129+
.withName(newSanitizedName)
130+
.build();
131+
replaceFunction.accept(alreadyExistedEntry.getName(), updatedExistedEntry);
132+
return newEntry;
133+
} else if (isEmpty(newEntry.getRawName())) {
134+
// try to add exactly same raw, skip the add here.
135+
return null;
136+
} else {
137+
// raw name isn't empty, so we need to create a new entry with a new name (sanitized).
138+
final String newSanitizedName = newNotCollidedName(entryGetter, newEntry.getRawName());
139+
return newEntry.toBuilder()
140+
.withName(newSanitizedName)
141+
.build();
142+
}
143+
}
144+
145+
private static Entry findCollidedEntry(final Entry newEntry, final Function<String, Entry> entryGetter) {
146+
return Optional.ofNullable(entryGetter.apply(newEntry.getName()))
147+
.filter(retrievedEntry -> !Objects.equals(retrievedEntry, newEntry))
148+
.orElse(null);
149+
}
150+
151+
private static String newNotCollidedName(final Function<String, Entry> entryGetter, final String rawName) {
152+
final String baseName = sanitizeConnectionName(rawName);
153+
int indexForAnticollision = 1;
154+
String newName = baseName + "_" + indexForAnticollision;
155+
while (entryGetter.apply(newName) != null) {
156+
indexForAnticollision++;
157+
newName = baseName + "_" + indexForAnticollision;
158+
}
159+
return newName;
160+
}
161+
162+
private static boolean isEmpty(final String value) {
163+
return value == null || value.isEmpty();
164+
}
165+
}

0 commit comments

Comments
 (0)