Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DG-1925: added code to create kafka topic 'TAG_PROP_EVENTS' and push events to… #3791

Open
wants to merge 7 commits into
base: beta
Choose a base branch
from
2 changes: 2 additions & 0 deletions common/src/main/java/org/apache/atlas/AtlasConstants.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,4 +39,6 @@ private AtlasConstants() {
public static final String DEFAULT_TYPE_VERSION = "1.0";
public static final int ATLAS_SHUTDOWN_HOOK_PRIORITY = 30;
public static final int TASK_WAIT_TIME_MS = 180_000;
public static final String ATLAS_KAFKA_TAG_TOPIC = "TAG_PROP_EVENTS";

}
32 changes: 32 additions & 0 deletions common/src/main/java/org/apache/atlas/utils/KafkaUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,38 @@ public void createTopics(List<String> topicNames, int numPartitions, int replica
}
}

public void createTopics(List<String[]> topicDetails, int replicationFactor)
throws TopicExistsException, ExecutionException, InterruptedException {

if (LOG.isDebugEnabled()) {
LOG.debug("==> createTopics()");
}

List<NewTopic> newTopicList = topicDetails.stream()
.map(details -> new NewTopic(details[0], Integer.parseInt(details[1]), (short) replicationFactor))
.collect(Collectors.toList());

CreateTopicsResult createTopicsResult = adminClient.createTopics(newTopicList);
Map<String, KafkaFuture<Void>> futureMap = createTopicsResult.values();

for (Map.Entry<String, KafkaFuture<Void>> futureEntry : futureMap.entrySet()) {
KafkaFuture<Void> future = futureEntry.getValue();

try {
future.get();
} catch (ExecutionException | InterruptedException e) {
LOG.error("Error while creating topic " + futureEntry.getKey(), e);
// Re-throw to handle these exceptions in the calling code.
throw e;
}
}

if (LOG.isDebugEnabled()) {
LOG.debug("<== createTopics()");
}
}


public List<String> listAllTopics() throws ExecutionException, InterruptedException {
if (LOG.isDebugEnabled()) {
LOG.debug("==> KafkaUtils.listAllTopics() ");
Expand Down
2 changes: 1 addition & 1 deletion distro/src/bin/atlas_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -500,7 +500,7 @@ def get_topics_to_create(confdir):
if topic_list is not None:
topics = topic_list.split(",")
else:
topics = [getConfigWithDefault("atlas.notification.hook.topic.name", "ATLAS_HOOK"), getConfigWithDefault("atlas.notification.entities.topic.name", "ATLAS_ENTITIES"), getConfigWithDefault("atlas.notification.relationships.topic.name", "ATLAS_RELATIONSHIPS")]
topics = [getConfigWithDefault("atlas.notification.hook.topic.name", "ATLAS_HOOK"), getConfigWithDefault("atlas.notification.entities.topic.name", "ATLAS_ENTITIES"), getConfigWithDefault("atlas.notification.relationships.topic.name", "ATLAS_RELATIONSHIPS"), getConfigWithDefault("atlas.notification.propagation.topic.name", "TAG_PROP_EVENTS")]
return topics

def get_atlas_url_port(confdir):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ public enum AtlasConfiguration {
NOTIFICATION_HOOK_TOPIC_NAME("atlas.notification.hook.topic.name", "ATLAS_HOOK"),
NOTIFICATION_ENTITIES_TOPIC_NAME("atlas.notification.entities.topic.name", "ATLAS_ENTITIES"),
NOTIFICATION_RELATIONSHIPS_TOPIC_NAME("atlas.notification.relationships.topic.name", "ATLAS_RELATIONSHIPS"),
NOTIFICATION_PROPAGATION_TOPIC_NAME("atlas.notification.propagation.topic.name", "TAG_PROP_EVENTS"),

NOTIFICATION_HOOK_CONSUMER_TOPIC_NAMES("atlas.notification.hook.consumer.topic.names", "ATLAS_HOOK"), // a comma separated list of topic names
NOTIFICATION_ENTITIES_CONSUMER_TOPIC_NAMES("atlas.notification.entities.consumer.topic.names", "ATLAS_ENTITIES"), // a comma separated list of topic names
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,10 @@
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import org.apache.atlas.AtlasConfiguration;

/**
* A class to create Kafka topics used by Atlas components.
Expand Down Expand Up @@ -61,19 +64,30 @@ public void createAtlasTopic(Configuration atlasProperties, String... topicNames
if (!handleSecurity(atlasProperties)) {
return;
}
try(KafkaUtils kafkaUtils = getKafkaUtils(atlasProperties)) {
int numPartitions = atlasProperties.getInt("atlas.notification.partitions", 1);
try (KafkaUtils kafkaUtils = getKafkaUtils(atlasProperties)) {
int numReplicas = atlasProperties.getInt("atlas.notification.replicas", 1);
kafkaUtils.createTopics(Arrays.asList(topicNames), numPartitions, numReplicas);
List<String[]> topicDetails = new ArrayList<>();

for (String topicName : topicNames) {
if (AtlasConfiguration.NOTIFICATION_PROPAGATION_TOPIC_NAME.getString().equals(topicName)) {
topicDetails.add(new String[]{topicName, "5"}); // 5 partitions for 'ABC'
} else {
topicDetails.add(new String[]{topicName, "1"}); // 1 partition for all others
}
}

kafkaUtils.createTopics(topicDetails, numReplicas);

} catch (Exception e) {
LOG.error("Error while creating topics e :" + e.getMessage(), e);
LOG.error("Error while creating topics: " + e.getMessage(), e);
}
} else {
LOG.info("Not creating topics {} as {} is false", StringUtils.join(topicNames, ","),
ATLAS_NOTIFICATION_CREATE_TOPICS_KEY);
}
}


@VisibleForTesting
protected boolean handleSecurity(Configuration atlasProperties) {
if (AuthenticationUtil.isKerberosAuthenticationEnabled(atlasProperties)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ public class KafkaNotification extends AbstractNotification implements Service {
public static final String ATLAS_HOOK_TOPIC = AtlasConfiguration.NOTIFICATION_HOOK_TOPIC_NAME.getString();
public static final String ATLAS_ENTITIES_TOPIC = AtlasConfiguration.NOTIFICATION_ENTITIES_TOPIC_NAME.getString();
public static final String ATLAS_RELATIONSHIPS_TOPIC = AtlasConfiguration.NOTIFICATION_RELATIONSHIPS_TOPIC_NAME.getString();
public static final String ATLAS_TAG_PROP_EVENTS = AtlasConfiguration.NOTIFICATION_PROPAGATION_TOPIC_NAME.getString();
protected static final String CONSUMER_GROUP_ID_PROPERTY = "group.id";

private static final String[] ATLAS_HOOK_CONSUMER_TOPICS = AtlasConfiguration.NOTIFICATION_HOOK_CONSUMER_TOPIC_NAMES.getStringArray(ATLAS_HOOK_TOPIC);
Expand All @@ -74,6 +75,7 @@ public class KafkaNotification extends AbstractNotification implements Service {
put(NotificationType.HOOK, ATLAS_HOOK_TOPIC);
put(NotificationType.ENTITIES, ATLAS_ENTITIES_TOPIC);
put(NotificationType.RELATIONSHIPS, ATLAS_RELATIONSHIPS_TOPIC);
put(NotificationType.EMIT_PLANNED_RELATIONSHIPS, ATLAS_TAG_PROP_EVENTS);
}
};

Expand Down Expand Up @@ -268,6 +270,7 @@ public void sendInternal(NotificationType notificationType, List<String> message
sendInternalToProducer(producer, notificationType, messages);
}


@VisibleForTesting
void sendInternalToProducer(Producer p, NotificationType notificationType, List<String> messages) throws NotificationException {
String topic = PRODUCER_TOPIC_MAP.get(notificationType);
Expand Down Expand Up @@ -307,6 +310,55 @@ void sendInternalToProducer(Producer p, NotificationType notificationType, List<
}
}

// ----- AbstractNotification with partition detail --------------------------------------------
@Override
public void sendInternal(NotificationType notificationType, List<String> messages, Integer partition) throws NotificationException {
KafkaProducer producer = getOrCreateProducer(notificationType);

sendInternalToProducer(producer, notificationType, messages, partition);
}


@VisibleForTesting
void sendInternalToProducer(Producer p, NotificationType notificationType, List<String> messages, Integer partition) throws NotificationException {
String topic = PRODUCER_TOPIC_MAP.get(notificationType);
List<MessageContext> messageContexts = new ArrayList<>();

for (String message : messages) {
ProducerRecord record = new ProducerRecord(topic, partition, null, message);

if (LOG.isDebugEnabled()) {
LOG.debug("Sending message for topic-partition {}-{}: {}", topic, partition, message);
}

Future future = p.send(record);

messageContexts.add(new MessageContext(future, message));
}

List<String> failedMessages = new ArrayList<>();
Exception lastFailureException = null;

for (MessageContext context : messageContexts) {
try {
RecordMetadata response = context.getFuture().get();

if (LOG.isDebugEnabled()) {
LOG.debug("Sent message for topic - {}, partition - {}, offset - {}", response.topic(), response.partition(), response.offset());
}
} catch (Exception e) {
lastFailureException = e;

failedMessages.add(context.getMessage());
}
}

if (lastFailureException != null) {
throw new NotificationException(lastFailureException, failedMessages);
}
}


// Get properties for consumer request
@VisibleForTesting
public Properties getConsumerProperties(NotificationType notificationType) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ public void setCurrentUser(String user) {
*/
public abstract void sendInternal(NotificationType type, List<String> messages) throws NotificationException;


public abstract void sendInternal(NotificationType notificationType, List<String> messages, Integer partition) throws NotificationException;
// ----- utility methods -------------------------------------------------

public static String getMessageJson(Object message) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,14 @@ enum NotificationType {
// Notifications from the Atlas integration hooks.
HOOK(new HookMessageDeserializer()),

EMIT_PLANNED_RELATIONSHIPS(new EntityMessageDeserializer()),

// Notifications to entity change consumers.
ENTITIES(new EntityMessageDeserializer()),

RELATIONSHIPS(new EntityMessageDeserializer());


private final AtlasNotificationMessageDeserializer deserializer;

NotificationType(AtlasNotificationMessageDeserializer deserializer) {
Expand Down
7 changes: 7 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -1718,6 +1718,13 @@
<version>2.2.2</version>
</dependency>

<!-- Google guava library for hashing -->
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>33.3.1-jre</version>
</dependency>

</dependencies>

<build>
Expand Down
Loading
Loading