Skip to content

MINOR: Make InternalStreamsBuilder#addGraphNode public #19816

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: trunk
Choose a base branch
from

Conversation

Chuckame
Copy link
Contributor

@Chuckame Chuckame commented May 26, 2025

Currently, building the topology with custom graph nodes fails with a NPE because the GraphNode#buildPriority field is null. We can set this value to an empiric one (like taking the one from the previous node), but I suppose the best is to reuse the same method as all the other nodes.


Problematic code:

final PriorityQueue<GraphNode> graphNodePriorityQueue = new PriorityQueue<>(5, Comparator.comparing(GraphNode::buildPriority));

NPE here:

Because of buildPriority being null, only set here currently:

node.setBuildPriority(buildPriorityIndex.getAndIncrement());


Impact in our codebase:

internal class ReplayableKTable<K, V>(
    private val replayedTable: KTableImpl<K, *, V>,
) : AbstractStream<K, V>(replayedTable as AbstractStream<K, V>) {
    fun replayFromInternalTopic(commandTopicPartitions: Int): KTable<K, V> {
        val replayedTableName = name
        val replayCommandTopic = "$replayedTableName-replay-command"

        builder.internalTopologyBuilder().addInternalTopic(replayCommandTopic, InternalTopicProperties(commandTopicPartitions))
        val replayTopicSourceNode = <code to make the replay command graph node>

        val replayNode = ReplayableTableGraphNode(
            "$replayedTableName-replay",
            replayTopicSourceNode.nodeName(),
            replayedTable,
            replayedTable.valueGetterSupplier(),
            name,
            subTopologySourceNodes
        )

-        replayNode.setBuildPriority(replayTopicSourceNode.buildPriority())
-        replayTopicSourceNode.addChild(replayNode)
-        graphNode.addChild(replayNode)
+        builder.addGraphNode(listOf(replayTopicSourceNode, graphNode), replayNode)

        return KTableImpl<K, Any, V>(
            name,
            keySerde,
            valueSerde,
            subTopologySourceNodes + replayTopicSourceNode.nodeName(),
            replayedTable.queryableStoreName(),
            replayNode.processor,
            replayNode,
            builder
        )
    }
}

Currently, building the topology with custom graph nodes fails with a NPE because the GraphNode#buildPriority field is null. We can set this value to an empiric one (like taking the one from the previous node), but the best is to reuse the same method as all the other nodes.
@github-actions github-actions bot added triage PRs from the community streams small Small PRs labels May 26, 2025
Copy link

github-actions bot commented Jun 3, 2025

A label of 'needs-attention' was automatically added to this PR in order to raise the
attention of the committers. Once this issue has been triaged, the triage label
should be removed to prevent this automation from happening again.

@mjsax
Copy link
Member

mjsax commented Jun 6, 2025

As discussed elsewhere: I don't think we want to expose any of these internal things. I would propose to close this PR.

@mjsax mjsax removed triage PRs from the community needs-attention labels Jun 6, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants