Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Apply inspections #588

Draft
wants to merge 3 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
The table of contents is too big for display.
Diff view
Diff view
  •  
  •  
  •  
1 change: 1 addition & 0 deletions .scalafmt.conf
Original file line number Diff line number Diff line change
Expand Up @@ -75,3 +75,4 @@ project.excludeFilters = [
"scripts/authors.scala"
]
project.layout = StandardConvention
rewrite.rules = [RedundantBraces]
Original file line number Diff line number Diff line change
Expand Up @@ -138,17 +138,16 @@ final class AmqpDetailsConnectionProvider private (
factory.setPassword(credentials.password)
}
virtualHost.foreach(factory.setVirtualHost)
sslConfiguration.foreach(sslConfiguration => {
sslConfiguration.foreach { sslConfiguration =>
if (sslConfiguration.protocol.isDefined) {
if (sslConfiguration.trustManager.isDefined)
factory.useSslProtocol(sslConfiguration.protocol.get, sslConfiguration.trustManager.get)
else factory.useSslProtocol(sslConfiguration.protocol.get)
} else if (sslConfiguration.context.isDefined) {
} else if (sslConfiguration.context.isDefined)
factory.useSslProtocol(sslConfiguration.context.get)
} else {
else
factory.useSslProtocol()
}
})
}
requestedHeartbeat.foreach(factory.setRequestedHeartbeat)
connectionTimeout.foreach(factory.setConnectionTimeout)
handshakeTimeout.foreach(factory.setHandshakeTimeout)
Expand Down Expand Up @@ -244,9 +243,8 @@ object AmqpCredentials {
final class AmqpSSLConfiguration private (val protocol: Option[String] = None,
val trustManager: Option[TrustManager] = None,
val context: Option[SSLContext] = None) {
if (protocol.isDefined && context.isDefined) {
if (protocol.isDefined && context.isDefined)
throw new IllegalArgumentException("Protocol and context can't be defined in the same AmqpSSLConfiguration.")
}

def withProtocol(protocol: String): AmqpSSLConfiguration =
copy(protocol = Some(protocol))
Expand Down Expand Up @@ -419,10 +417,8 @@ final class AmqpCachedConnectionProvider private (val provider: AmqpConnectionPr
throw new ConcurrentModificationException(
"Unexpected concurrent modification while closing the connection.")
}
} else {
if (!state.compareAndSet(c, Connected(cachedConnection, clients - 1)))
releaseRecursive(amqpConnectionProvider, connection)
}
} else if (!state.compareAndSet(c, Connected(cachedConnection, clients - 1)))
releaseRecursive(amqpConnectionProvider, connection)
case Closing => releaseRecursive(amqpConnectionProvider, connection)
}

Expand Down Expand Up @@ -452,4 +448,4 @@ object AmqpCachedConnectionProvider {
private case object Connecting extends State
private final case class Connected(connection: Connection, clients: Int) extends State
private case object Closing extends State
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ final class TemporaryQueueSourceSettings private (
}

object TemporaryQueueSourceSettings {
def apply(connectionProvider: AmqpConnectionProvider, exchange: String) =
def apply(connectionProvider: AmqpConnectionProvider, exchange: String): TemporaryQueueSourceSettings =
new TemporaryQueueSourceSettings(connectionProvider, exchange)

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,7 @@ import scala.concurrent.Promise
val callback = getAsyncCallback[(DeliveryTag, Boolean)] {
case (tag: DeliveryTag, multiple: Boolean) => confirmCallback(tag, multiple)
}
new ConfirmCallback { // cant use function literal because it doesn't work with 2.11
override def handle(tag: DeliveryTag, multiple: Boolean): Unit = callback.invoke((tag, multiple))
}
(tag: DeliveryTag, multiple: Boolean) => callback.invoke((tag, multiple))
}

private def onConfirmation(tag: DeliveryTag, multiple: Boolean): Unit = {
Expand Down Expand Up @@ -155,9 +153,8 @@ import scala.concurrent.Promise
if (noAwaitingMessages && exitQueue.isEmpty) {
streamCompletion.success(Done)
super.onUpstreamFinish()
} else {
} else
log.debug("Received upstream finish signal - stage will be closed when all buffered messages are processed")
}

private def publish(message: WriteMessage): DeliveryTag = {
val tag: DeliveryTag = channel.getNextPublishSeqNo
Expand Down Expand Up @@ -193,10 +190,9 @@ import scala.concurrent.Promise

override protected def onTimer(timerKey: Any): Unit =
timerKey match {
case tag: DeliveryTag => {
case tag: DeliveryTag =>
log.debug("Received timeout for deliveryTag {}.", tag)
onRejection(tag, multiple = false)
}
case _ => ()
}

Expand All @@ -211,4 +207,4 @@ import scala.concurrent.Promise
}

private def isFinished: Boolean = isClosed(in) && noAwaitingMessages && exitQueue.isEmpty
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,15 +58,14 @@ import scala.concurrent.{ Future, Promise }
buffer += (tag -> AwaitingMessage(tag, passThrough))

override def dequeueAwaitingMessages(tag: DeliveryTag, multiple: Boolean): Iterable[AwaitingMessage[T]] =
if (multiple) {
if (multiple)
dequeueWhile((t, _) => t <= tag)
} else {
else {
setReady(tag)
if (isAtHead(tag)) {
if (isAtHead(tag))
dequeueWhile((_, message) => message.ready)
} else {
else
Seq.empty
}
}

private def dequeueWhile(
Expand All @@ -88,4 +87,4 @@ import scala.concurrent.{ Future, Promise }

}, streamCompletion.future)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import scala.concurrent.{ Future, Promise }
private[amqp] final class AmqpReplyToSinkStage(replyToSinkSettings: AmqpReplyToSinkSettings)
extends GraphStageWithMaterializedValue[SinkShape[WriteMessage], Future[Done]] { stage =>

val in = Inlet[WriteMessage]("AmqpReplyToSink.in")
val in: Inlet[WriteMessage] = Inlet[WriteMessage]("AmqpReplyToSink.in")

override def shape: SinkShape[WriteMessage] = SinkShape.of(in)

Expand Down Expand Up @@ -82,9 +82,8 @@ private[amqp] final class AmqpReplyToSinkStage(replyToSinkSettings: AmqpReplyToS
elem.immediate,
elem.properties.orNull,
elem.bytes.toArray)
} else if (settings.failIfReplyToMissing) {
} else if (settings.failIfReplyToMissing)
onFailure(new RuntimeException("Reply-to header was not set"))
}

tryPull(in)
}
Expand All @@ -94,4 +93,4 @@ private[amqp] final class AmqpReplyToSinkStage(replyToSinkSettings: AmqpReplyToS
}

override def toString: String = "AmqpReplyToSink"
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,8 @@ private[amqp] final class AmqpRpcFlowStage(writeSettings: AmqpWriteSettings, buf
extends GraphStageWithMaterializedValue[FlowShape[WriteMessage, CommittableReadResult], Future[String]] {
stage =>

val in = Inlet[WriteMessage]("AmqpRpcFlow.in")
val out = Outlet[CommittableReadResult]("AmqpRpcFlow.out")
val in: Inlet[WriteMessage] = Inlet[WriteMessage]("AmqpRpcFlow.in")
val out: Outlet[CommittableReadResult] = Outlet[CommittableReadResult]("AmqpRpcFlow.out")

override def shape: FlowShape[WriteMessage, CommittableReadResult] = FlowShape.of(in, out)

Expand All @@ -70,7 +70,7 @@ private[amqp] final class AmqpRpcFlowStage(writeSettings: AmqpWriteSettings, buf
val consumerCallback = getAsyncCallback(handleDelivery)

val commitCallback = getAsyncCallback[AckArguments] {
case AckArguments(deliveryTag, multiple, promise) => {
case AckArguments(deliveryTag, multiple, promise) =>
try {
channel.basicAck(deliveryTag, multiple)
unackedMessages -= 1
Expand All @@ -81,10 +81,9 @@ private[amqp] final class AmqpRpcFlowStage(writeSettings: AmqpWriteSettings, buf
} catch {
case e: Throwable => promise.failure(e)
}
}
}
val nackCallback = getAsyncCallback[NackArguments] {
case NackArguments(deliveryTag, multiple, requeue, promise) => {
case NackArguments(deliveryTag, multiple, requeue, promise) =>
try {
channel.basicNack(deliveryTag, multiple, requeue)
unackedMessages -= 1
Expand All @@ -95,7 +94,6 @@ private[amqp] final class AmqpRpcFlowStage(writeSettings: AmqpWriteSettings, buf
} catch {
case e: Throwable => promise.failure(e)
}
}
}

val amqpSourceConsumer = new DefaultConsumer(channel) {
Expand All @@ -105,7 +103,7 @@ private[amqp] final class AmqpRpcFlowStage(writeSettings: AmqpWriteSettings, buf
body: Array[Byte]): Unit =
consumerCallback.invoke(
new CommittableReadResult {
override val message = ReadResult(ByteString(body), envelope, properties)
override val message: ReadResult = ReadResult(ByteString(body), envelope, properties)

override def ack(multiple: Boolean): Future[Done] = {
val promise = Promise[Done]()
Expand Down Expand Up @@ -148,21 +146,19 @@ private[amqp] final class AmqpRpcFlowStage(writeSettings: AmqpWriteSettings, buf
}

def handleDelivery(message: CommittableReadResult): Unit =
if (isAvailable(out)) {
if (isAvailable(out))
pushMessage(message)
} else if (queue.size + 1 > bufferSize) {
else if (queue.size + 1 > bufferSize)
onFailure(new RuntimeException(s"Reached maximum buffer size $bufferSize"))
} else {
else
queue.enqueue(message)
}

setHandler(
out,
new OutHandler {
override def onPull(): Unit =
if (queue.nonEmpty) {
if (queue.nonEmpty)
pushMessage(queue.dequeue())
}

override def onDownstreamFinish(cause: Throwable): Unit = {
setKeepGoing(true)
Expand Down Expand Up @@ -207,15 +203,14 @@ private[amqp] final class AmqpRpcFlowStage(writeSettings: AmqpWriteSettings, buf

val expectedResponses: Int = {
val headers = props.getHeaders
if (headers == null) {
if (headers == null)
responsesPerMessage
} else {
else {
val r = headers.get("expectedReplies")
if (r != null) {
if (r != null)
r.asInstanceOf[Int]
} else {
else
responsesPerMessage
}
}
}

Expand All @@ -237,4 +232,4 @@ private[amqp] final class AmqpRpcFlowStage(writeSettings: AmqpWriteSettings, buf

override def toString: String = "AmqpRpcFlow"

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -94,9 +94,8 @@ private[amqp] final class AmqpSourceStage(settings: AmqpSourceSettings, bufferSi
properties: BasicProperties,
body: Array[Byte]): Unit = {
val message = if (ackRequired) {

new CommittableReadResult {
override val message = ReadResult(ByteString(body), envelope, properties)
override val message: ReadResult = ReadResult(ByteString(body), envelope, properties)

override def ack(multiple: Boolean): Future[Done] = {
val promise = Promise[Done]()
Expand Down Expand Up @@ -155,21 +154,19 @@ private[amqp] final class AmqpSourceStage(settings: AmqpSourceSettings, bufferSi
}

def handleDelivery(message: CommittableReadResult): Unit =
if (isAvailable(out)) {
if (isAvailable(out))
pushMessage(message)
} else if (queue.size + 1 > bufferSize) {
else if (queue.size + 1 > bufferSize)
onFailure(new RuntimeException(s"Reached maximum buffer size $bufferSize"))
} else {
else
queue.enqueue(message)
}

setHandler(
out,
new OutHandler {
override def onPull(): Unit =
if (queue.nonEmpty) {
if (queue.nonEmpty)
pushMessage(queue.dequeue())
}

override def onDownstreamFinish(cause: Throwable): Unit =
if (unackedMessages == 0) super.onDownstreamFinish(cause)
Expand Down
4 changes: 2 additions & 2 deletions amqp/src/test/scala/docs/scaladsl/AmqpDocsSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ class AmqpDocsSpec extends AmqpSpec {

override implicit val patienceConfig: PatienceConfig = PatienceConfig(10.seconds)

val businessLogic: CommittableReadResult => Future[CommittableReadResult] = Future.successful(_)
val businessLogic: CommittableReadResult => Future[CommittableReadResult] = Future.successful

"The AMQP Connectors" should {

Expand Down Expand Up @@ -158,7 +158,7 @@ class AmqpDocsSpec extends AmqpSpec {
val mergingFlow = mergedSources
.viaMat(KillSwitches.single)(Keep.right)
.to(Sink.fold(Set.empty[Int]) {
case (seen, (branch, element)) =>
case (seen, (branch, _)) =>
if (seen.size == fanoutSize) completion.trySuccess(Done)
seen + branch
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,21 +29,21 @@ import com.rabbitmq.client._
* otherwise undefined
*/
class AmqpProxyConnection(protected val delegate: Connection) extends Connection {
override def getAddress: InetAddress = delegate.getAddress()
override def getAddress: InetAddress = delegate.getAddress

override def getPort: Int = delegate.getPort()
override def getPort: Int = delegate.getPort

override def getChannelMax: Int = delegate.getChannelMax()
override def getChannelMax: Int = delegate.getChannelMax

override def getFrameMax: Int = delegate.getFrameMax()
override def getFrameMax: Int = delegate.getFrameMax

override def getHeartbeat: Int = delegate.getHeartbeat()
override def getHeartbeat: Int = delegate.getHeartbeat

override def getClientProperties: util.Map[String, AnyRef] = delegate.getClientProperties()
override def getClientProperties: util.Map[String, AnyRef] = delegate.getClientProperties

override def getClientProvidedName: String = delegate.getClientProvidedName()
override def getClientProvidedName: String = delegate.getClientProvidedName

override def getServerProperties: util.Map[String, AnyRef] = delegate.getServerProperties()
override def getServerProperties: util.Map[String, AnyRef] = delegate.getServerProperties

override def createChannel(): Channel = delegate.createChannel()

Expand Down Expand Up @@ -76,9 +76,9 @@ class AmqpProxyConnection(protected val delegate: Connection) extends Connection

override def clearBlockedListeners(): Unit = delegate.clearBlockedListeners()

override def getExceptionHandler: ExceptionHandler = delegate.getExceptionHandler()
override def getExceptionHandler: ExceptionHandler = delegate.getExceptionHandler

override def getId: String = delegate.getId()
override def getId: String = delegate.getId

override def setId(s: String): Unit = delegate.setId(s)

Expand All @@ -88,9 +88,9 @@ class AmqpProxyConnection(protected val delegate: Connection) extends Connection
override def removeShutdownListener(shutdownListener: ShutdownListener): Unit =
delegate.removeShutdownListener(shutdownListener)

override def getCloseReason: ShutdownSignalException = delegate.getCloseReason()
override def getCloseReason: ShutdownSignalException = delegate.getCloseReason

override def notifyListeners(): Unit = delegate.notifyListeners()

override def isOpen: Boolean = delegate.isOpen()
override def isOpen: Boolean = delegate.isOpen
}
Original file line number Diff line number Diff line change
Expand Up @@ -270,9 +270,9 @@ class AmqpConnectorsSpec extends AmqpSpec {
.take(input.size)
.runWith(Sink.seq)

result.futureValue.map(cm => {
result.futureValue.map { cm =>
noException should be thrownBy cm.ack().futureValue
})
}
}

"not republish message without autoAck(false) if nack is sent" in assertAllStagesStopped {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ class AmqpGraphStageLogicConnectionShutdownSpec
val shutdownsAdded = new AtomicInteger()
val shutdownsRemoved = new AtomicInteger()

override def beforeEach() = {
override def beforeEach(): Unit = {
shutdownsAdded.set(0)
shutdownsRemoved.set(0)
}
Expand Down
Loading