Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@
language : scala

scala:
- 2.11.12
- 2.12.6
- 2.12.11
- 2.13.0

dist: trusty

Expand Down
42 changes: 28 additions & 14 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ lazy val contributors = Seq(

lazy val commonSettings = Seq(
organization := "com.spinoco",
scalaVersion := "2.12.6",
crossScalaVersions := Seq("2.11.12", "2.12.6"),
scalaVersion := "2.12.11",
crossScalaVersions := Seq("2.12.11", "2.13.0"),
scalacOptions ++= Seq(
"-feature",
"-deprecation",
Expand All @@ -19,23 +19,37 @@ lazy val commonSettings = Seq(
"-language:existentials",
"-language:postfixOps",
"-Xfatal-warnings",
"-Yno-adapted-args",
"-Ywarn-value-discard",
"-Ywarn-unused-import"
"-Ywarn-value-discard"
),
scalacOptions ++= {
CrossVersion.partialVersion(scalaVersion.value) match {
case Some((2, v)) if v >= 13 =>
Seq("-Ymacro-annotations", "-Ywarn-unused:imports")
case _ =>
Seq("-Yno-adapted-args", "-Ywarn-unused-import")
}
},
scalacOptions in (Compile, console) ~= {_.filterNot("-Ywarn-unused-import" == _)},
scalacOptions in (Test, console) := (scalacOptions in (Compile, console)).value,
libraryDependencies ++= Seq(
compilerPlugin("org.scalamacros" % "paradise" % "2.1.0" cross CrossVersion.full)
, "org.scodec" %% "scodec-bits" % "1.1.4"
, "org.scodec" %% "scodec-core" % "1.10.3"
, "com.spinoco" %% "protocol-http" % "0.3.15"
, "com.spinoco" %% "protocol-websocket" % "0.3.15"
, "co.fs2" %% "fs2-core" % "1.0.0"
, "co.fs2" %% "fs2-io" % "1.0.0"
, "com.spinoco" %% "fs2-crypto" % "0.4.0"
, "org.scalacheck" %% "scalacheck" % "1.13.4" % "test"
"org.scalacheck" %% "scalacheck" % "1.14.3" % "test"
, "com.spinoco" %% "protocol-http" % "0.4.0-M1"
, "com.spinoco" %% "protocol-websocket" % "0.4.0-M1"
, "co.fs2" %% "fs2-core" % "2.3.0"
, "co.fs2" %% "fs2-io" % "2.3.0"
),
libraryDependencies ++= {
CrossVersion.partialVersion(scalaVersion.value) match {
case Some((2, v)) if v <= 12 =>
Seq(
compilerPlugin("org.scalamacros" % "paradise" % "2.1.1" cross CrossVersion.full)
)
case _ =>
// if scala 2.13.0-M4 or later, macro annotations merged into scala-reflect
// https://github.com/scala/scala/pull/6606
Nil
}
},
scmInfo := Some(ScmInfo(url("https://github.com/Spinoco/fs2-http"), "[email protected]:Spinoco/fs2-http.git")),
homepage := None,
licenses += ("MIT", url("http://opensource.org/licenses/MIT")),
Expand Down
54 changes: 25 additions & 29 deletions src/main/scala/spinoco/fs2/http/HttpClient.scala
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
package spinoco.fs2.http

import java.nio.channels.AsynchronousChannelGroup

import cats.effect.{Concurrent, ConcurrentEffect, ContextShift, Timer}
import java.util.concurrent.TimeUnit

import cats.Applicative
import javax.net.ssl.SSLContext
import cats.effect._
import fs2._
import fs2.concurrent.SignallingRef
import fs2.io.tcp.Socket
import fs2.io.tcp.{Socket, SocketGroup}
import fs2.io.tls.TLSContext
import scodec.{Codec, Decoder, Encoder}
import spinoco.fs2.http.internal.{addressForRequest, clientLiftToSecure, readWithTimeout}
import spinoco.fs2.http.sse.{SSEDecoder, SSEEncoding}
Expand All @@ -17,7 +18,6 @@ import spinoco.protocol.http.header._
import spinoco.protocol.mime.MediaType
import spinoco.protocol.http.{HttpRequestHeader, HttpResponseHeader}

import scala.concurrent.ExecutionContext
import scala.concurrent.duration._


Expand Down Expand Up @@ -101,25 +101,24 @@ trait HttpClient[F[_]] {
}


object HttpClient {
object HttpClient {

@inline def apply[F[_]](implicit instance: HttpClient[F]): HttpClient[F] = instance

/**
* Creates an Http Client
* @param requestCodec Codec used to decode request header
* @param responseCodec Codec used to encode response header
* @param sslExecutionContext Strategy used when communication with SSL (https or wss)
* @param sslContext SSL Context to use with SSL Client (https, wss)
*/
def apply[F[_] : ConcurrentEffect : ContextShift : Timer](
/**
* Creates an Http Client
* @param requestCodec Codec used to decode request header
* @param responseCodec Codec used to encode response header
* @param socketGroup Group of sockets from which to create the client for http request.
* @param tlsContext The TLS context used for elevating the http socket to https.
*/
def mk[F[_]: ConcurrentEffect: ContextShift: Timer](
requestCodec : Codec[HttpRequestHeader]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need still the timer?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it is required for the websocket client, as we need to do pings there.

, responseCodec : Codec[HttpResponseHeader]
, sslExecutionContext: => ExecutionContext
, sslContext : => SSLContext
)(implicit AG: AsynchronousChannelGroup):F[HttpClient[F]] = Sync[F].delay {
lazy val sslCtx = sslContext
lazy val sslS = sslExecutionContext

)(
socketGroup: SocketGroup
, tlsContext: TLSContext
):F[HttpClient[F]] = Applicative[F].pure {
new HttpClient[F] {
def request(
request: HttpRequest[F]
Expand All @@ -128,10 +127,10 @@ trait HttpClient[F[_]] {
, timeout: Duration
): Stream[F, HttpResponse[F]] = {
Stream.eval(addressForRequest[F](request.scheme, request.host)).flatMap { address =>
Stream.resource(io.tcp.client[F](address))
.evalMap { socket =>
if (!request.isSecure) Applicative[F].pure(socket)
else clientLiftToSecure[F](sslS, sslCtx)(socket, request.host)
Stream.resource(socketGroup.client(address))
.flatMap { socket =>
if (!request.isSecure) Stream.emit(socket)
else Stream.resource(clientLiftToSecure[F](tlsContext)(socket, request.host))
}
.flatMap { impl.request[F](request, chunkSize, maxResponseHeaderSize, timeout, requestCodec, responseCodec ) }}
}
Expand All @@ -143,7 +142,7 @@ trait HttpClient[F[_]] {
, chunkSize: Int
, maxFrameSize: Int
): Stream[F, Option[HttpResponseHeader]] =
WebSocket.client(request,pipe,maxResponseHeaderSize,chunkSize,maxFrameSize, requestCodec, responseCodec, sslS, sslCtx)
WebSocket.client(request,pipe,maxResponseHeaderSize,chunkSize,maxFrameSize, requestCodec, responseCodec)(socketGroup, tlsContext)


def sse[A : SSEDecoder](rq: HttpRequest[F], maxResponseHeaderSize: Int, chunkSize: Int): Stream[F, A] =
Expand Down Expand Up @@ -174,7 +173,7 @@ trait HttpClient[F[_]] {
timeout match {
case fin: FiniteDuration =>
eval(clock.realTime(TimeUnit.MILLISECONDS)).flatMap { start =>
HttpRequest.toStream(request, requestCodec).to(socket.writes(Some(fin))).last.onFinalize(socket.endOfOutput).flatMap { _ =>
HttpRequest.toStream(request, requestCodec).through(socket.writes(Some(fin))).last.onFinalize(socket.endOfOutput).flatMap { _ =>
eval(SignallingRef[F, Boolean](true)).flatMap { timeoutSignal =>
eval(clock.realTime(TimeUnit.MILLISECONDS)).flatMap { sent =>
val remains = fin - (sent - start).millis
Expand All @@ -186,14 +185,11 @@ trait HttpClient[F[_]] {
}}}}

case _ =>
HttpRequest.toStream(request, requestCodec).to(socket.writes(None)).last.onFinalize(socket.endOfOutput).flatMap { _ =>
HttpRequest.toStream(request, requestCodec).through(socket.writes(None)).last.onFinalize(socket.endOfOutput).flatMap { _ =>
socket.reads(chunkSize, None) through HttpResponse.fromStream[F](maxResponseHeaderSize, responseCodec)
}
}
}

}


}

35 changes: 18 additions & 17 deletions src/main/scala/spinoco/fs2/http/HttpRequestOrResponse.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ sealed trait HttpRequestOrResponse[F[_]] { self =>

/** yields to true, if body of this request shall be chunked **/
lazy val bodyIsChunked : Boolean =
withHeaders(internal.bodyIsChunked)
withHeaders(spinoco.fs2.http.internal.bodyIsChunked)

/** allows to stream arbitrary sized stream of `A` to remote party (i.e. upload) **/
def withStreamBody[A](body: Stream[F, A])(implicit E: StreamBodyEncoder[F, A]): Self = {
Expand All @@ -37,7 +37,7 @@ sealed trait HttpRequestOrResponse[F[_]] { self =>

/** sets body size to supplied value **/
def withBodySize(sz: Long): Self =
updateHeaders(withHeaders(internal.swapHeader(`Content-Length`(sz))))
updateHeaders(withHeaders(spinoco.fs2.http.internal.swapHeader(`Content-Length`(sz))))

/** gets body size, if one specified **/
def bodySize: Option[Long] =
Expand All @@ -46,6 +46,7 @@ sealed trait HttpRequestOrResponse[F[_]] { self =>
protected def body: Stream[F, Byte]

/** encodes body `A` given BodyEncoder exists **/

def withBody[A](a: A)(implicit W: BodyEncoder[A], ev: RaiseThrowable[F]): Self = {
W.encode(a) match {
case Failure(err) => updateBody(body = Stream.raiseError(new Throwable(s"failed to encode $a: $err")))
Expand All @@ -70,7 +71,7 @@ sealed trait HttpRequestOrResponse[F[_]] { self =>
withHeaders { _.collectFirst { case `Content-Type`(ct) => ct } match {
case None => F.pure(Attempt.failure(Err("Content type is not known")))
case Some(ct) =>
F.map(self.body.chunks.map(util.chunk2ByteVector).compile.toVector) { bs =>
F.map(self.body.chunks.map(_.toByteVector).compile.toVector) { bs =>
if (bs.isEmpty) Attempt.failure(Err("Body is empty"))
else D.decode(bs.reduce(_ ++ _), ct)
}
Expand All @@ -79,15 +80,15 @@ sealed trait HttpRequestOrResponse[F[_]] { self =>

/** gets body as stream of byteVectors **/
def bodyAsByteVectorStream:Stream[F,ByteVector] =
self.body.chunks.map(util.chunk2ByteVector)
self.body.chunks.map(_.toByteVector)

/** decodes body as string with encoding supplied in ContentType **/
def bodyAsString(implicit F: Sync[F]): F[Attempt[String]] =
bodyAs[String](BodyDecoder.stringDecoder, F)

/** updates content type to one specified **/
def withContentType(ct: ContentType): Self =
updateHeaders(withHeaders(internal.swapHeader(`Content-Type`(ct))))
updateHeaders(withHeaders(spinoco.fs2.http.internal.swapHeader(`Content-Type`(ct))))

/** gets ContentType, if one specififed **/
def contentType: Option[ContentType] =
Expand All @@ -96,7 +97,7 @@ sealed trait HttpRequestOrResponse[F[_]] { self =>

/** configures encoding as chunked **/
def chunkedEncoding: Self =
updateHeaders(withHeaders(internal.swapHeader(`Transfer-Encoding`(List("chunked")))))
updateHeaders(withHeaders(spinoco.fs2.http.internal.swapHeader(`Transfer-Encoding`(List("chunked")))))

def withHeaders[A](f: List[HttpHeader] => A): A = self match {
case HttpRequest(_,_,header,_) => f(header.headers)
Expand Down Expand Up @@ -190,10 +191,10 @@ object HttpRequest {
)
, body = Stream.empty)

def post[F[_] : RaiseThrowable, A](uri: Uri, a: A)(implicit E: BodyEncoder[A]): HttpRequest[F] =
def post[F[_]: RaiseThrowable, A](uri: Uri, a: A)(implicit E: BodyEncoder[A]): HttpRequest[F] =
get(uri).withMethod(HttpMethod.POST).withBody(a)

def put[F[_] : RaiseThrowable, A](uri: Uri, a: A)(implicit E: BodyEncoder[A]): HttpRequest[F] =
def put[F[_]: RaiseThrowable, A](uri: Uri, a: A)(implicit E: BodyEncoder[A]): HttpRequest[F] =
get(uri).withMethod(HttpMethod.PUT).withBody(a)

def delete[F[_]](uri: Uri): HttpRequest[F] =
Expand All @@ -210,11 +211,11 @@ object HttpRequest {
* @tparam F
* @return
*/
def fromStream[F[_] : RaiseThrowable](
def fromStream[F[_]: RaiseThrowable](
maxHeaderSize: Int
, headerCodec: Codec[HttpRequestHeader]
): Pipe[F, Byte, (HttpRequestHeader, Stream[F, Byte])] = {
import internal._
import spinoco.fs2.http.internal._
_ through httpHeaderAndBody(maxHeaderSize) flatMap { case (header, bodyRaw) =>
headerCodec.decodeValue(header.bits) match {
case Failure(err) => Stream.raiseError(new Throwable(s"Decoding of the request header failed: $err"))
Expand All @@ -240,11 +241,11 @@ object HttpRequest {
* @param request request to convert to stream
* @param headerCodec Codec to convert the header to bytes
*/
def toStream[F[_] : RaiseThrowable](
def toStream[F[_]: RaiseThrowable](
request: HttpRequest[F]
, headerCodec: Codec[HttpRequestHeader]
): Stream[F, Byte] = Stream.suspend {
import internal._
import spinoco.fs2.http.internal._

headerCodec.encode(request.header) match {
case Failure(err) => Stream.raiseError(new Throwable(s"Encoding of the header failed: $err"))
Expand Down Expand Up @@ -283,7 +284,7 @@ final case class HttpResponse[F[_]](
def sseBody[A](in: Stream[F, A])(implicit E: SSEEncoder[A], ev: RaiseThrowable[F]): Self =
self
.updateBody(in through SSEEncoding.encodeA[F, A])
.updateHeaders(withHeaders(internal.swapHeader(`Content-Type`(ContentType.TextContent(MediaType.`text/event-stream`, None)))))
.updateHeaders(withHeaders(spinoco.fs2.http.internal.swapHeader(`Content-Type`(ContentType.TextContent(MediaType.`text/event-stream`, None)))))
}


Expand All @@ -301,11 +302,11 @@ object HttpResponse {
/**
* Decodes stream of bytes as HttpResponse.
*/
def fromStream[F[_] : RaiseThrowable](
def fromStream[F[_]: RaiseThrowable](
maxHeaderSize: Int
, responseCodec: Codec[HttpResponseHeader]
): Pipe[F,Byte, HttpResponse[F]] = {
import internal._
import spinoco.fs2.http.internal._

_ through httpHeaderAndBody(maxHeaderSize) flatMap { case (header, bodyRaw) =>
responseCodec.decodeValue(header.bits) match {
Expand All @@ -325,11 +326,11 @@ object HttpResponse {


/** Encodes response to stream of bytes **/
def toStream[F[_] : RaiseThrowable](
def toStream[F[_]: RaiseThrowable](
response: HttpResponse[F]
, headerCodec: Codec[HttpResponseHeader]
): Stream[F, Byte] = Stream.suspend {
import internal._
import spinoco.fs2.http.internal._

headerCodec.encode(response.header) match {
case Failure(err) => Stream.raiseError(new Throwable(s"Failed to encode http response : $response :$err "))
Expand Down
17 changes: 8 additions & 9 deletions src/main/scala/spinoco/fs2/http/HttpServer.scala
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
package spinoco.fs2.http

import java.net.InetSocketAddress
import java.nio.channels.AsynchronousChannelGroup

import cats.effect.{ConcurrentEffect, Sync, Timer}
import cats.effect.{ConcurrentEffect, ContextShift, Sync}
import cats.syntax.all._
import fs2._
import fs2.concurrent.SignallingRef
import fs2.io.tcp.SocketGroup
import scodec.Codec
import spinoco.protocol.http.codec.{HttpRequestHeaderCodec, HttpResponseHeaderCodec}
import spinoco.protocol.http.{HttpRequestHeader, HttpResponseHeader, HttpStatusCode}
Expand Down Expand Up @@ -34,9 +34,9 @@ object HttpServer {
* This is also evaluated when the server failed to process the request itself (i.e. `service` did not handle the failure )
* @param sendFailure A function to be evaluated on failure to process the the response.
* Request is not suplied if failure happened before request was constructed.
*
* @param socketGroup Group of sockets from which to create the server socket.
*/
def apply[F[_] : ConcurrentEffect : Timer](
def mk[F[_]: ConcurrentEffect: ContextShift](
maxConcurrent: Int = Int.MaxValue
, receiveBufferSize: Int = 256 * 1024
, maxHeaderSize: Int = 10 *1024
Expand All @@ -48,19 +48,18 @@ object HttpServer {
, requestFailure : Throwable => Stream[F, HttpResponse[F]]
, sendFailure: (Option[HttpRequestHeader], HttpResponse[F], Throwable) => Stream[F, Nothing]
)(
implicit
AG: AsynchronousChannelGroup
socketGroup: SocketGroup
): Stream[F, Unit] = {
import Stream._
import internal._
import spinoco.fs2.http.internal._
val (initial, readDuration) = requestHeaderReceiveTimeout match {
case fin: FiniteDuration => (true, fin)
case _ => (false, 0.millis)
}

io.tcp.server[F](bindTo, receiveBufferSize = receiveBufferSize).map { resource =>
socketGroup.server[F](bindTo, receiveBufferSize = receiveBufferSize).map { resource =>
Stream.resource(resource).flatMap { socket =>
eval(SignallingRef(initial)).flatMap { timeoutSignal =>
eval(SignallingRef[F, Boolean](initial)).flatMap { timeoutSignal =>
readWithTimeout[F](socket, readDuration, timeoutSignal.get, receiveBufferSize)
.through(HttpRequest.fromStream(maxHeaderSize, requestCodec))
.flatMap { case (request, body) =>
Expand Down
4 changes: 2 additions & 2 deletions src/main/scala/spinoco/fs2/http/body/StreamBodyEncoder.scala
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ object StreamBodyEncoder {
StreamBodyEncoder.instance(ContentType.BinaryContent(MediaType.`application/octet-stream`, None)) { _.flatMap { bv => Stream.chunk(ByteVectorChunk(bv)) } }

/** encoder that encodes utf8 string, with `text/plain` utf8 content type **/
def utf8StringEncoder[F[_]](implicit F: MonadError[F, Throwable]) : StreamBodyEncoder[F, String] =
def utf8StringEncoder[F[_]: RaiseThrowable](implicit F: MonadError[F, Throwable]) : StreamBodyEncoder[F, String] =
byteVectorEncoder mapInF[String] { s =>
ByteVector.encodeUtf8(s) match {
case Right(bv) => F.pure(bv)
Expand All @@ -57,7 +57,7 @@ object StreamBodyEncoder {
} withContentType ContentType.TextContent(MediaType.`text/plain`, Some(MIMECharset.`UTF-8`))

/** a convenience wrapper to convert body encoder to StreamBodyEncoder **/
def fromBodyEncoder[F[_] : RaiseThrowable, A](implicit E: BodyEncoder[A]):StreamBodyEncoder[F, A] =
def fromBodyEncoder[F[_]: RaiseThrowable, A](implicit E: BodyEncoder[A]):StreamBodyEncoder[F, A] =
StreamBodyEncoder.instance(E.contentType) { _.flatMap { a =>
E.encode(a) match {
case Failure(err) => Stream.raiseError(new Throwable(s"Failed to encode: $err ($a)"))
Expand Down
Loading