diff --git a/src/main/scala/spinoco/fs2/mail/imap/IMAPClient.scala b/src/main/scala/spinoco/fs2/mail/imap/IMAPClient.scala index 4146ee7..323674e 100644 --- a/src/main/scala/spinoco/fs2/mail/imap/IMAPClient.scala +++ b/src/main/scala/spinoco/fs2/mail/imap/IMAPClient.scala @@ -6,7 +6,7 @@ import java.nio.charset.{Charset, StandardCharsets} import fs2._ import fs2.async.mutable.Semaphore import fs2.io.tcp.Socket -import fs2.util.{Async, Catchable, Effect, Monad} +import fs2.util._ import fs2.util.syntax._ import scodec.{Attempt, Codec} import scodec.bits.{BitVector, ByteVector} @@ -109,7 +109,10 @@ trait IMAPClient[F[_]] { def bodyStructureOf(uid: Long @@ MailUID): F[IMAPResult[Seq[EmailBodyPart]]] /** - * Allows to fetch bytes of given mime binary part + * Allows to fetch bytes of given mime binary part. + * + * All other commands will fail while this command is being processed. As to prevent deadlock. + * * @param uid Id of message * @param part Binary part specification. The data in binary part specification will be used * to parse conent to stream of bytes. @@ -118,7 +121,10 @@ trait IMAPClient[F[_]] { def bytesOf(uid: Long @@ MailUID, part: EmailBodyPart.BinaryPart): Stream[F, Byte] /** - * Allows to fetch textual representation of given mime part + * Allows to fetch textual representation of given mime part. + * + * All other commands will fail while this command is being processed. As to prevent deadlock. + * * @param uid Id of message * @param part Textual part specification. The data in specification will be used to decode * text to resulting stream of strings. @@ -126,6 +132,18 @@ trait IMAPClient[F[_]] { */ def textOf(uid: Long @@ MailUID, part: EmailBodyPart.TextPart): Stream[F, Char] + /** + * Causes to perform Idle command against the server as per RFC 2177. + * + * All other commands will fail while this command is being processed. As to prevent deadlock. + * + * The idle command should be restarted atleast every 29 minutes to prevent the remote server + * logging out the user due to inactivity. In reality the idle should be restarted more often + * as real servers have much shorter inactivity timeouts. + * + */ + def idle: Stream[F, IMAPIdleContext[F]] + } /** @@ -153,6 +171,7 @@ object IMAPClient { Stream.eval(Async.refOf[F, Long](0l)) flatMap { idxRef => Stream.eval(async.semaphore(0)) flatMap { requestSemaphore => Stream.eval(async.boundedQueue[F, IMAPData](bufferLines)) flatMap { incomingQ => + Stream.eval(F.refOf(false)) flatMap { inProcessing => val received = ( @@ -169,50 +188,84 @@ object IMAPClient { val request = requestCmd(idxRef, requestSemaphore, incomingQ.dequeue, send) _ + /** + * Guards a request to the server by checking whether there is currently connection blocking command running. + * If such command is running it is impossible to interrupt it. + * As such this request would wait on the command to finish. If this command was called as result of some partial + * data from the "blocking" command then we would get into dead lock. + * Thus we rather fail. + * + * Do note that the connection blocking commands are only that return stream. + */ + def guardedRequest(cmd: IMAPCommand): RequestResult[F] = { + Stream.eval(inProcessing.get).flatMap{ + case false => request(cmd) + case true => Stream.fail(new Throwable("Cannot perform action on IMAP client since we are in midst of another IMAP streaming")) + } + } + val client = new IMAPClient[F] { def login(userName: String, password: String) = - shortContent(request(LoginPlainText(userName, password)))(parseLogin[F]) + shortContent(guardedRequest(LoginPlainText(userName, password)))(parseLogin[F]) def logout = - shortContent(request(Logout)) { _ => F.pure(()) } as (()) + shortContent(guardedRequest(Logout)) { _ => F.pure(()) } as (()) def capability = - shortContent(request(Capability))(parseCapability[F]) + shortContent(guardedRequest(Capability))(parseCapability[F]) def select(mailbox: @@[String, MailboxName]) = - shortContent(request(Select(mailbox)))(parseSelect[F]) + shortContent(guardedRequest(Select(mailbox)))(parseSelect[F]) def examine(mailbox: @@[String, MailboxName]) = - shortContent(request(Examine(mailbox)))(parseSelect[F]) + shortContent(guardedRequest(Examine(mailbox)))(parseSelect[F]) def list(reference: String, wildcardName: String): F[IMAPResult[Seq[IMAPMailbox]]] = - shortContent(request(ListMailbox(reference, wildcardName)))(parseMailboxList[F]) + shortContent(guardedRequest(ListMailbox(reference, wildcardName)))(parseMailboxList[F]) def search(term: IMAPSearchTerm, charset: Option[String] = None): F[IMAPResult[Seq[Long @@ MailUID]]] = - shortContent(request(Search(charset, term)))(parseSearchResult[F]) + shortContent(guardedRequest(Search(charset, term)))(parseSearchResult[F]) def emailHeaders(range: NumericRange[Long]): F[Vector[IMAPEmailHeader]] = - rawContent(request(Fetch(range, Seq(IMAPFetchContent.UID, IMAPFetchContent.Body(BodySection.HEADER))))) + rawContent(guardedRequest(Fetch(range, Seq(IMAPFetchContent.UID, IMAPFetchContent.Body(BodySection.HEADER))))) .through(fetchLog) .through(mkEmailHeader(emailHeaderCodec)) .runFold(Vector.empty[IMAPEmailHeader])(_ :+ _) def bodyStructureOf(uid: @@[Long, MailUID]): F[IMAPResult[Seq[EmailBodyPart]]] = - shortContent(request(Fetch(NumericRange(uid:Long, uid:Long, 1), Seq(IMAPFetchContent.BODYSTRUCTURE))))(parseBodyStructure[F]) + shortContent(guardedRequest(Fetch(NumericRange(uid:Long, uid:Long, 1), Seq(IMAPFetchContent.BODYSTRUCTURE))))(parseBodyStructure[F]) def bytesOf(uid: @@[Long, MailUID], part: EmailBodyPart.BinaryPart): Stream[F, Byte] = { val content = IMAPFetchContent.Body(BodySection(part.partId)) - rawContent(request(Fetch(NumericRange(uid: Long, uid: Long, 1), Seq(content)))) through - fetchBytesOf(0, content.content, part.tpe.fields.encoding) + impl.guardStream( + inProcessing + , rawContent(request(Fetch(NumericRange(uid: Long, uid: Long, 1), Seq(content)))) through + fetchBytesOf(0, content.content, part.tpe.fields.encoding) + ) } def textOf(uid: @@[Long, MailUID], part: EmailBodyPart.TextPart): Stream[F, Char] = { val content = IMAPFetchContent.Body(BodySection(part.partId)) - rawContent(request(Fetch(NumericRange(uid: Long, uid: Long, 1), Seq(content)))) through - fetchTextOf(0, content.content, part.tpe.fields.encoding, part.charsetName) + impl.guardStream( + inProcessing + , rawContent(request(Fetch(NumericRange(uid: Long, uid: Long, 1), Seq(content)))) through + fetchTextOf(0, content.content, part.tpe.fields.encoding, part.charsetName) + ) } -} + + def idle: Stream[F, IMAPIdleContext[F]] = { + impl.guardStream( + inProcessing + , request(Idle).flatMap{ + case Left(err) => Stream.fail(new Throwable(s"Failed to perform the command: $err")) + case Right(data) => Stream.eval(IMAPIdleContext.mk(data, send)) + } + ) + } + + + } concurrent.join(Int.MaxValue)(Stream( Stream.emit(client) @@ -221,13 +274,37 @@ object IMAPClient { )) .interruptWhen(terminated) .onFinalize(terminated.set(true)) - }}}} + }}}}} } object impl { + /** + * Guards a stream stream against a given ref. Which signals whether there is currently blocking connection + * thus our stream cannot be run. + * + * If the stream is allowed to be run, it itself sets the blocking to prevent other stream to be run while this stream + * is executing. + * + * @param runningGuard The guard that tells us whether there is an existing blocking connection. + * @param stream The stream to be run if there is currently no blocking connection + */ + def guardStream[F[_], A]( + runningGuard: Async.Ref[F, Boolean] + , stream: Stream[F, A] + )(implicit F: Applicative[F]): Stream[F, A] = { + Stream.bracket(runningGuard.modify(_ => true))( + change => { + if (change.previous) Stream.fail(new Throwable("Cannot perform action on IMAP client since we are in midst of another IMAP streaming")) + else stream + } + , change => if (change.previous) F.pure(()) else runningGuard.setPure(false) + ) + } + + type RequestResult[F[_]] = Stream[F, Either[String, Stream[F, IMAPData]]] /** @@ -299,9 +376,9 @@ object IMAPClient { tail.takeThrough { case IMAPText(l) => ! l.startsWith(tag) case _ => true - }.dropLastIf { - case IMAPText(l) => l.startsWith(tag) - case _ => false + }.filter { + case IMAPText(l) => !l.startsWith(tag) + case _ => true } onFinalize { requestSemaphore.increment } )) } diff --git a/src/main/scala/spinoco/fs2/mail/imap/IMAPCommand.scala b/src/main/scala/spinoco/fs2/mail/imap/IMAPCommand.scala index 16d13a5..a50adf3 100644 --- a/src/main/scala/spinoco/fs2/mail/imap/IMAPCommand.scala +++ b/src/main/scala/spinoco/fs2/mail/imap/IMAPCommand.scala @@ -39,7 +39,6 @@ object IMAPCommand { } } - final case class Fetch(range: NumericRange[Long], content: Seq[IMAPFetchContent]) extends IMAPCommand { def asIMAPv4: String = { val contentString = s"(${content.map(_.content).mkString(" ")})" @@ -48,6 +47,10 @@ object IMAPCommand { } } + final case object Idle extends IMAPCommand { + def asIMAPv4: String = "IDLE" + } + } diff --git a/src/main/scala/spinoco/fs2/mail/imap/IMAPIdleContext.scala b/src/main/scala/spinoco/fs2/mail/imap/IMAPIdleContext.scala new file mode 100644 index 0000000..2099cb5 --- /dev/null +++ b/src/main/scala/spinoco/fs2/mail/imap/IMAPIdleContext.scala @@ -0,0 +1,91 @@ +package spinoco.fs2.mail.imap + +import fs2.Stream +import fs2.util.Async +import shapeless.tag +import spinoco.fs2.mail.imap.IMAPClient.impl.{IMAPBytes, IMAPData, IMAPText} + +/** + * The context of an idling IMAP server connection that is ready to receive notifications from server. + */ +trait IMAPIdleContext[F[_]] { + + /** + * The IMAP IDLE events that are being produced as result of calling the IDLE command. + * + * This call can be made only once. As this is not backed by a [[fs2.async.mutable.Topic]], thus + * the data of each call of "events" would be different. + * + * Thus this will fail any subsequent calls to events. + */ + def events: Stream[F, IMAPIdleMessage] + + /** + * Closes the IDLE connection with server. + * + * This returns as soon as it writes to the socket. + * To get the end of the idle output await on the events stream, which will + * end as soon as the server stops feeding the data to the client. + */ + def done: F[Unit] + +} + +object IMAPIdleContext { + + /** + * Creates a IDLE context that has events method guarded against multiple calls to it. + * + * @param incoming The data from server as result of IDLE command. + * @param writeToServer A method to write to server. + */ + def mk[F[_]]( + incoming: Stream[F, IMAPData] + , writeToServer: String => F[Unit] + )( + implicit F: Async[F] + ): F[IMAPIdleContext[F]] = { + F.map(F.refOf(false)) { startedRead => + new IMAPIdleContext[F] { + def done: F[Unit] = writeToServer("DONE\r\n") + + def events: Stream[F, IMAPIdleMessage] = { + Stream.eval(startedRead.modify(_ => true)).flatMap { change => + if (change.previous) Stream.fail(new Throwable("Cannot try to subscribe to idle events for the second time")) + else impl.makeIdleEvents(incoming) + } + } + } + + } + } + + + object impl { + + /** + * Parses out the IDLE IMAP events from preprocessed server data. + * + * @param in The data from the server already split by lines. + */ + def makeIdleEvents[F[_]](in: Stream[F, IMAPData]): Stream[F, IMAPIdleMessage] = { + in.flatMap{ + case _: IMAPBytes => Stream.empty //IDLE cannot give bytes, maybe fail here? + case IMAPText(line) => + val trimmed = line.trim + if (trimmed.startsWith("+ idling")) Stream.empty //Notified about now idling, server accepted we can carry on + else if (trimmed.startsWith("*")) { + val dropped = trimmed.drop(1).trim + val splitIdx = dropped.indexOf(" ") + if (splitIdx < 0) Stream.fail(new Throwable(s"Could not find split between email id and command while parsing IMAP IDLE line: $line")) + else { + val (email, messageType) = dropped.splitAt(splitIdx) + val emailUid = tag[MailUID](email.toLong) + Stream.emit(IMAPIdleMessage.parse(messageType.trim, emailUid)) + + } + } else Stream.fail(new Throwable(s"Line starts wih an unknown character while parsing IMAP IDLE line: $line")) + } + } + } +} \ No newline at end of file diff --git a/src/main/scala/spinoco/fs2/mail/imap/IMAPIdleMessage.scala b/src/main/scala/spinoco/fs2/mail/imap/IMAPIdleMessage.scala new file mode 100644 index 0000000..0376c28 --- /dev/null +++ b/src/main/scala/spinoco/fs2/mail/imap/IMAPIdleMessage.scala @@ -0,0 +1,42 @@ +package spinoco.fs2.mail.imap + +import shapeless.tag.@@ + + +object IMAPIdleMessage { + + /** + * Notification about new email with a given id. + * @param uid The id of the new email. + */ + case class Exists(uid: Long @@ MailUID) extends IMAPIdleMessage + + /** + * Notifies a given email being deleted on the server. + * @param uid The id of the email that was deleted. + */ + case class Expunge(uid: Long @@ MailUID) extends IMAPIdleMessage + + /** + * A message with a type that we are not expecting appeared. + * @param uid The id of the email this message concerns. + */ + case class CustomMessage(name: String, uid: Long @@ MailUID) extends IMAPIdleMessage + + /** + * Build the proper message from the provided message type. + * @param messageType The type of the message to be constructed. + * @param emailUid The id of the message which this message concerns. + * @return + */ + def parse(messageType: String, emailUid: Long @@ MailUID): IMAPIdleMessage = { + messageType match { + case "EXISTS" => Exists(emailUid) + case "EXPUNGE" => Expunge(emailUid) + case other => CustomMessage(other, emailUid) + } + } +} + +/** Messages that the server pushes to the client while the client requested IDLE **/ +sealed trait IMAPIdleMessage diff --git a/src/test/scala/spinoco/fs2/mail/imap/IMAPIdleSpec.scala b/src/test/scala/spinoco/fs2/mail/imap/IMAPIdleSpec.scala new file mode 100644 index 0000000..2bea707 --- /dev/null +++ b/src/test/scala/spinoco/fs2/mail/imap/IMAPIdleSpec.scala @@ -0,0 +1,42 @@ +package spinoco.fs2.mail.imap + +import fs2.Task +import fs2.Stream +import org.scalacheck.Properties +import org.scalacheck.Prop._ +import shapeless.tag +import spinoco.fs2.mail.imap.IMAPClient.impl.IMAPText + +object IMAPIdleSpec extends Properties("IMAPIdleSpec"){ + + val encoded = List( + "* 3 EXISTS" + , "+ idling" + , "* 6 EXPUNGE" + , "* 6 OTHER" + ) + + property("decode-commands") = protect { + IMAPIdleContext.impl.makeIdleEvents[Task]( + Stream.emits(encoded.map(IMAPText.apply)) + ).runLog.unsafeRun() ?= Vector( + IMAPIdleMessage.Exists(tag[MailUID](3)) + , IMAPIdleMessage.Expunge(tag[MailUID](6)) + , IMAPIdleMessage.CustomMessage("OTHER", tag[MailUID](6)) + ) + } + + property("decode-commands.fail.spacing") = protect { + IMAPIdleContext.impl.makeIdleEvents[Task]( + Stream.emit(IMAPText("* 3EXISTS")) + ).runLog.attempt.unsafeRun().isLeft + } + + property("decode-commands.fail.start") = protect { + IMAPIdleContext.impl.makeIdleEvents[Task]( + Stream.emit(IMAPText("3 EXISTS")) + ).runLog.attempt.unsafeRun().isLeft + } + + +}