Skip to content

Commit

Permalink
Introduced central WebsocketTopic
Browse files Browse the repository at this point in the history
  • Loading branch information
FelberMartin committed Nov 23, 2024
1 parent d945643 commit b1bceef
Show file tree
Hide file tree
Showing 7 changed files with 47 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,12 @@ class TestWebsocketProvider : WebsocketProvider {
override val isConnected: Flow<Boolean> = flowOf(true)

override fun <T : Any> subscribe(
channel: String,
topic: String,
deserializer: DeserializationStrategy<T>
): Flow<WebsocketProvider.WebsocketData<T>> = flowOf(WebsocketProvider.WebsocketData.Subscribe())

override fun <T : Any> subscribeMessage(
channel: String,
topic: String,
deserializer: DeserializationStrategy<T>
): Flow<T> = emptyFlow()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,12 @@ class WebsocketProviderStub : WebsocketProvider {
override val isConnected: Flow<Boolean> = flowOf(true)

override fun <T : Any> subscribe(
channel: String,
topic: String,
deserializer: DeserializationStrategy<T>
): Flow<WebsocketProvider.WebsocketData<T>> = emptyFlow()

override fun <T : Any> subscribeMessage(
channel: String,
topic: String,
deserializer: DeserializationStrategy<T>
): Flow<T> = emptyFlow()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,12 @@ interface WebsocketProvider {
* Performs automatic reconnects.
*/
fun <T : Any> subscribe(
channel: String,
topic: String,
deserializer: DeserializationStrategy<T>
): Flow<WebsocketData<T>>

fun <T : Any> subscribeMessage(
channel: String,
topic: String,
deserializer: DeserializationStrategy<T>
): Flow<T>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -199,28 +199,28 @@ class WebsocketProviderImpl(
* The given flow can only be subscribed to once.
*/
override fun <T : Any> subscribe(
channel: String,
topic: String,
deserializer: DeserializationStrategy<T>
): Flow<WebsocketProvider.WebsocketData<T>> {
return session
.transformLatest { currentSession ->
val flow: Flow<WebsocketProvider.WebsocketData<T>> = flow {
emitAll(
currentSession.subscribe(
StompSubscribeHeaders(destination = channel),
StompSubscribeHeaders(destination = topic),
deserializer
)
)
}
.onStart {
Log.d(TAG, "subscribe! $channel")
Log.d(TAG, "subscribe! $topic")
emit(WebsocketProvider.WebsocketData.Subscribe())
}
.onCompletion {
Log.d(TAG, "unsubscribe! $channel")
Log.d(TAG, "unsubscribe! $topic")
}
.catch { e ->
Log.d(TAG, "Subscription $channel reported error: ${e.localizedMessage}")
Log.d(TAG, "Subscription $topic reported error: ${e.localizedMessage}")
}
.map {
WebsocketProvider.WebsocketData.Message(it)
Expand All @@ -247,10 +247,10 @@ class WebsocketProviderImpl(
}

override fun <T : Any> subscribeMessage(
channel: String,
topic: String,
deserializer: DeserializationStrategy<T>
): Flow<T> {
return subscribe(channel, deserializer).mapNotNull {
return subscribe(topic, deserializer).mapNotNull {
when (it) {
is WebsocketProvider.WebsocketData.Message -> it.message
else -> null
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package de.tum.informatics.www1.artemis.native_app.core.websocket.impl

object WebsocketTopic {

/**
* Returns the topic for conversation updates for a course-wide conversation.
*/
fun getCourseWideConversationUpdateTopic(courseId: Long): String {
return "/topic/metis/courses/$courseId"
}

/**
* Returns the topic for conversation updates for a non-course-wide conversation.
*/
fun getNormalConversationUpdateTopic(userId: Long): String {
return "/topic/user/$userId/notifications/conversations"
}

/**
* Returns the topic for conversation meta updates. This includes channel creation, deletion,
* and updates (like changing the channel name).
*/
fun getConversationMetaUpdateTopic(courseId: Long, userId: Long): String {
return "/user/topic/metis/courses/$courseId/conversations/user/$userId"
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import de.tum.informatics.www1.artemis.native_app.core.data.cookieAuth
import de.tum.informatics.www1.artemis.native_app.core.data.performNetworkCall
import de.tum.informatics.www1.artemis.native_app.core.data.service.KtorProvider
import de.tum.informatics.www1.artemis.native_app.core.websocket.WebsocketProvider
import de.tum.informatics.www1.artemis.native_app.core.websocket.impl.WebsocketTopic
import de.tum.informatics.www1.artemis.native_app.feature.metis.conversation.service.network.MetisService
import de.tum.informatics.www1.artemis.native_app.feature.metis.conversation.service.network.RESOURCE_PATH_SEGMENTS
import de.tum.informatics.www1.artemis.native_app.feature.metis.shared.content.MetisContext
Expand Down Expand Up @@ -153,12 +154,12 @@ internal class MetisServiceImpl(
courseId: Long,
clientId: Long
): Flow<WebsocketProvider.WebsocketData<MetisPostDTO>> {
val channel = "/topic/metis/courses/$courseId"
val channelConversationNotifications = "/topic/user/$clientId/notifications/conversations"
val courseWideTopic = WebsocketTopic.getCourseWideConversationUpdateTopic(courseId)
val normalTopic = WebsocketTopic.getNormalConversationUpdateTopic(clientId)

val flow1 = websocketProvider.subscribe(channel, MetisPostDTO.serializer())
val flow2 = websocketProvider.subscribe(channelConversationNotifications, MetisPostDTO.serializer())
val courseWideUpdates = websocketProvider.subscribe(courseWideTopic, MetisPostDTO.serializer())
val normalUpdates = websocketProvider.subscribe(normalTopic, MetisPostDTO.serializer())

return merge(flow1, flow2)
return merge(courseWideUpdates, normalUpdates)
}
}
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
package de.tum.informatics.www1.artemis.native_app.feature.metis.shared.service.network

import de.tum.informatics.www1.artemis.native_app.core.websocket.WebsocketProvider
import de.tum.informatics.www1.artemis.native_app.core.websocket.impl.WebsocketTopic
import de.tum.informatics.www1.artemis.native_app.feature.metis.shared.content.dto.ConversationWebsocketDto
import kotlinx.coroutines.flow.Flow

fun WebsocketProvider.subscribeToConversationUpdates(userId: Long, courseId: Long): Flow<ConversationWebsocketDto> {
val topic = "/user/topic/metis/courses/$courseId/conversations/user/$userId"
val topic = WebsocketTopic.getConversationMetaUpdateTopic(courseId, userId)

return subscribeMessage(topic, ConversationWebsocketDto.serializer())
}

0 comments on commit b1bceef

Please sign in to comment.