Skip to content

Commit 9923520

Browse files
committedNov 1, 2016
wip - refactoring of RemoteWebcamWindow
1 parent 1468282 commit 9923520

6 files changed

+149
-16
lines changed
 

‎build.sbt

+3
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,9 @@ libraryDependencies ++= Seq(
88
"com.typesafe.akka" %% "akka-actor" % akkaVersion,
99
"com.typesafe.akka" %% "akka-stream" % akkaVersion,
1010
"com.typesafe.akka" %% "akka-stream-contrib" % "0.4",
11+
//"com.typesafe.akka" %% "akka-http-core" % akkaVersion,
12+
"com.typesafe.akka" %% "akka-http-core" % "2.4.9",
13+
"com.typesafe.akka" %% "akka-http-experimental" % "2.4.9",
1114
"org.typelevel" %% "cats" % "0.8.0",
1215
// mapping
1316
"com.typesafe.play" %% "play-json" % "2.5.8",

‎src/main/scala/fr/xebia/streams/WebcamWindow.scala ‎src/main/scala/fr/xebia/streams/LocalWebcamWindow.scala

+3-3
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ import fr.xebia.streams.transform.{ Flip, MediaConversion }
77
import fr.xebia.streams.video.{ Dimensions, Webcam }
88
import org.bytedeco.javacv.CanvasFrame
99

10-
object WebcamWindow extends App {
10+
object LocalWebcamWindow extends App {
1111

1212
implicit val system = ActorSystem()
1313
implicit val materializer = ActorMaterializer()
@@ -17,9 +17,9 @@ object WebcamWindow extends App {
1717
canvas.setDefaultCloseOperation(javax.swing.JFrame.EXIT_ON_CLOSE)
1818

1919
val imageDimensions = Dimensions(width = 640, height = 480)
20-
val webcamSource = Webcam.local(deviceId = 0, dimensions = imageDimensions)
20+
val localCameraSource = Webcam.local(deviceId = 0, dimensions = imageDimensions)
2121

22-
val graph = webcamSource
22+
val graph = localCameraSource
2323
.map(MediaConversion.toMat) // most OpenCV manipulations require a Matrix
2424
.map(Flip.horizontal)
2525
.map(MediaConversion.toFrame) // convert back to a frame
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
package fr.xebia.streams
2+
3+
import akka.actor.ActorSystem
4+
import akka.stream.ActorMaterializer
5+
import akka.stream.scaladsl.{ Sink, Source }
6+
import akka.util.ByteString
7+
import fr.xebia.streams.transform.MediaConversion
8+
import fr.xebia.streams.video.Webcam
9+
import org.bytedeco.javacpp.opencv_core
10+
import org.bytedeco.javacpp.opencv_core.Mat
11+
import org.bytedeco.javacv.CanvasFrame
12+
import org.slf4j.LoggerFactory
13+
14+
import scala.concurrent.ExecutionContext
15+
16+
object RemoteWebcamWindow extends App {
17+
18+
val logger = LoggerFactory.getLogger(getClass)
19+
20+
implicit val system = ActorSystem()
21+
implicit val materializer = ActorMaterializer()
22+
23+
val canvas = new CanvasFrame("Webcam")
24+
canvas.setDefaultCloseOperation(javax.swing.JFrame.EXIT_ON_CLOSE)
25+
26+
implicit val ec = system.dispatcher
27+
28+
val remoteCameraSource = Webcam.remote("192.168.0.17")
29+
30+
val graph = remoteCameraSource
31+
.map(handleSource)
32+
.map(_.map(MediaConversion.toFrame))
33+
.map(_.map(canvas.showImage))
34+
.map(_.to(Sink.ignore))
35+
36+
graph.map(_.run())
37+
38+
def handleSource(source: Source[ByteString, Any])(implicit ec: ExecutionContext) = {
39+
source
40+
.map { bytes => logger.info(bytes.toString()); bytes }
41+
.map(_.toArray)
42+
.map { bytes =>
43+
val mat = new Mat(512, 288, opencv_core.CV_8SC3)
44+
mat.data().put(bytes: _*)
45+
mat
46+
}
47+
}
48+
49+
}

‎src/main/scala/fr/xebia/streams/video/LocalCamFramePublisher.scala

+1-1
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ private[video] class LocalCamFramePublisher(
2121
private implicit val ec = context.dispatcher
2222

2323
// Lazy so that nothing happens until the flow begins
24-
private lazy val grabber = buildGrabber(
24+
private lazy val grabber: FrameGrabber = buildGrabber(
2525
deviceId = deviceId,
2626
imageWidth = imageWidth,
2727
imageHeight = imageHeight,
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
package fr.xebia.streams.video
2+
3+
import akka.actor.{ ActorLogging, DeadLetterSuppression, Props }
4+
import akka.stream.actor.ActorPublisher
5+
import akka.stream.actor.ActorPublisherMessage.{ Cancel, Request }
6+
import akka.stream.scaladsl.Tcp
7+
import fr.xebia.streams.video.RemoteCamFramePublisher.{ Continue, buildGrabber }
8+
import org.bytedeco.javacv.Frame
9+
10+
private[video] class RemoteCamFramePublisher(url: String)
11+
extends ActorPublisher[Frame] with ActorLogging {
12+
13+
private implicit val ec = context.dispatcher
14+
15+
private lazy val grabber = buildGrabber(url)
16+
17+
def receive: Receive = {
18+
case _: Request => emitFrames()
19+
case Continue => emitFrames()
20+
case Cancel => onCompleteThenStop()
21+
case unexpectedMsg => log.warning(s"Unexpected message: $unexpectedMsg")
22+
}
23+
24+
private def emitFrames(): Unit = {
25+
if (isActive && totalDemand > 0) {
26+
/*
27+
Grabbing a frame is a blocking I/O operation, so we don't send too many at once.
28+
*/
29+
grabFrame().foreach(onNext)
30+
if (totalDemand > 0) {
31+
self ! Continue
32+
}
33+
}
34+
}
35+
36+
private def grabFrame(): Option[Frame] = {
37+
Option(grabber.grab())
38+
}
39+
40+
}
41+
42+
object RemoteCamFramePublisher {
43+
44+
def props(url: String): Props = Props(new RemoteCamFramePublisher(url))
45+
46+
def buildGrabber(url: String) = RemoteFrameGrabber(url)
47+
48+
private case object Continue extends DeadLetterSuppression
49+
50+
case class RemoteFrameGrabber(url: String) {
51+
def grab(): Frame = ???
52+
}
53+
54+
//Tcp().bindAndHandle()
55+
56+
}

‎src/main/scala/fr/xebia/streams/video/Webcam.scala

+37-12
Original file line numberDiff line numberDiff line change
@@ -1,36 +1,61 @@
11
package fr.xebia.streams.video
22

33
import akka.NotUsed
4-
import akka.actor.{ActorSystem, Props}
4+
import akka.actor.{ ActorSystem, Props }
5+
import akka.http.scaladsl.Http
6+
import akka.http.scaladsl.model.{ HttpRequest, HttpResponse }
7+
import akka.stream.Materializer
58
import akka.stream.actor.ActorPublisher
6-
import akka.stream.scaladsl.Source
9+
import akka.stream.scaladsl.{ Framing, Source }
10+
import akka.util.{ ByteString, Timeout }
11+
import fr.xebia.streams.RemoteWebcamWindow._
712
import org.bytedeco.javacpp.opencv_core._
813
import org.bytedeco.javacv.Frame
914
import org.bytedeco.javacv.FrameGrabber.ImageMode
15+
import org.slf4j.LoggerFactory
16+
17+
import scala.concurrent.Future
1018

1119
object Webcam {
1220

1321
object local {
1422

1523
def apply(
16-
deviceId: Int,
17-
dimensions: Dimensions,
18-
bitsPerPixel: Int = CV_8U,
19-
imageMode: ImageMode = ImageMode.COLOR
20-
)(implicit system: ActorSystem): Source[Frame, NotUsed] = {
24+
deviceId: Int,
25+
dimensions: Dimensions,
26+
bitsPerPixel: Int = CV_8U,
27+
imageMode: ImageMode = ImageMode.COLOR
28+
)(implicit system: ActorSystem): Source[Frame, NotUsed] = {
2129
val props: Props = LocalCamFramePublisher.props(deviceId, dimensions.width, dimensions.height, bitsPerPixel, imageMode)
2230
val webcamActorRef = system.actorOf(props)
23-
val webcamActorPublisher = ActorPublisher[Frame](webcamActorRef)
31+
val localActorPublisher = ActorPublisher[Frame](webcamActorRef)
2432

25-
Source.fromPublisher(webcamActorPublisher)
33+
Source.fromPublisher(localActorPublisher)
2634
}
2735
}
2836

2937
object remote {
3038

31-
def apply(host: String, port: String)
32-
(implicit system: ActorSystem): Source[Frame, NotUsed] = {
33-
Source.fromPublisher(???)
39+
import scala.concurrent.duration._
40+
val logger = LoggerFactory.getLogger(getClass)
41+
42+
implicit val timeout = Timeout(5.seconds)
43+
44+
val beginOfFrame = ByteString(0xff, 0xd8)
45+
46+
val endOfFrame = ByteString(0xff, 0xd9)
47+
48+
//http://doc.akka.io/docs/akka/2.4/scala/stream/stream-cookbook.html#chunking-up-a-stream-of-bytestrings-into-limited-size-bytestrings
49+
def apply(host: String)(implicit system: ActorSystem, mat: Materializer): Future[Source[ByteString, Any]] = {
50+
implicit val ec = system.dispatcher
51+
val httpRequest = HttpRequest(uri = "/html/cam_pic_new.php")
52+
53+
Http()
54+
.singleRequest(httpRequest)
55+
.map { response => logger.info(response.toString()); response }
56+
.map(_.entity.dataBytes)
57+
.map(_.via(Framing.delimiter(endOfFrame, maximumFrameLength = 100, allowTruncation = true)))
58+
.map(_.map(_.dropWhile(_ != beginOfFrame)))
3459
}
3560

3661
}

0 commit comments

Comments
 (0)
Please sign in to comment.