Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@ import com.typesafe.tools.mima.core._

Global / onChangedBuildSource := ReloadOnSourceChanges

// Workaround for https://github.com/scala-native/scala-native/issues/2024
Global / concurrentRestrictions += Tags.limit(NativeTags.Link, 1)

ThisBuild / tlBaseVersion := "3.13"

ThisBuild / organization := "co.fs2"
Expand Down Expand Up @@ -378,7 +381,8 @@ lazy val root = tlCrossRootProject

lazy val commonNativeSettings = Seq[Setting[?]](
tlVersionIntroduced := List("2.12", "2.13", "3").map(_ -> "3.13.0").toMap,
Test / nativeBrewFormulas += "openssl"
Test / nativeBrewFormulas += "openssl",
Test / nativeConfig ~= { _.withEmbedResources(true) }
)

lazy val core = crossProject(JVMPlatform, JSPlatform, NativePlatform)
Expand Down
78 changes: 77 additions & 1 deletion io/jvm-native/src/main/scala/fs2/io/iojvmnative.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,12 @@
package fs2
package io

import cats.effect.kernel.Sync
import cats.effect.kernel.{Async, Deferred, Outcome, Resource, Sync}
import cats.effect.kernel.implicits._
import cats.syntax.all._
import fs2.io.internal.PipedStreamBuffer

import java.io.{InputStream, OutputStream}
import scala.reflect.ClassTag

private[fs2] trait iojvmnative {
Expand Down Expand Up @@ -56,4 +59,77 @@ private[fs2] trait iojvmnative {
case None => Stream.raiseError(new IOException(s"Resource $name not found"))
}

/** Take a function that emits to a `java.io.OutputStream` effectfully,
* and return a stream which, when run, will perform that function and emit
* the bytes recorded in the OutputStream as an fs2.Stream
*
* The stream produced by this will terminate if:
* - `f` returns
* - `f` calls `OutputStream#close`
*
* If none of those happens, the stream will run forever.
*/
def readOutputStream[F[_]: Async](
chunkSize: Int
)(
f: OutputStream => F[Unit]
): Stream[F, Byte] = {
val mkOutput: Resource[F, (OutputStream, InputStream)] =
Resource.make(Sync[F].delay {
val buf = new PipedStreamBuffer(chunkSize)
(buf.outputStream, buf.inputStream)
})(ois =>
Sync[F].blocking {
// Piped(I/O)Stream implementations cant't throw on close, no need to nest the handling here.
ois._2.close()
ois._1.close()
}
)

Stream.resource(mkOutput).flatMap { case (os, is) =>
Stream.eval(Deferred[F, Option[Throwable]]).flatMap { err =>
// We need to close the output stream regardless of how `f` finishes
// to ensure an outstanding blocking read on the input stream completes.
// In such a case, there's a race between completion of the read
// stream and finalization of the write stream, so we capture the error
// that occurs when writing and rethrow it.
val write = f(os).guaranteeCase((outcome: Outcome[F, Throwable, Unit]) =>
Sync[F].blocking(os.close()) *> err
.complete(outcome match {
case Outcome.Errored(t) => Some(t)
case _ => None
})
.void
)
val read = readInputStream(is.pure[F], chunkSize, closeAfterUse = false)
read.concurrently(Stream.eval(write)) ++ Stream.eval(err.get).flatMap {
case None => Stream.empty
case Some(t) => Stream.raiseError[F](t)
}
}
}
}

/** Pipe that converts a stream of bytes to a stream that will emit a single `java.io.InputStream`,
* that is closed whenever the resulting stream terminates.
*
* If the `close` of resulting input stream is invoked manually, then this will await until the
* original stream completely terminates.
*
* Because all `InputStream` methods block (including `close`), the resulting `InputStream`
* should be consumed on a different thread pool than the one that is backing the effect.
*
* Note that the implementation is not thread safe -- only one thread is allowed at any time
* to operate on the resulting `java.io.InputStream`.
*/
def toInputStream[F[_]: Async]: Pipe[F, Byte, InputStream] =
source => Stream.resource(toInputStreamResource(source))

/** Like [[toInputStream]] but returns a `Resource` rather than a single element stream.
*/
def toInputStreamResource[F[_]: Async](
source: Stream[F, Byte]
): Resource[F, InputStream] =
JavaInputOutputStream.toInputStream(source)

}
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ import java.util.concurrent.Executors
class IoPlatformSuite extends Fs2Suite {

// This suite runs for a long time, this avoids timeouts in CI.
override def munitIOTimeout: Duration = 2.minutes
override def munitIOTimeout: Duration = 5.minutes

group("readInputStream") {
test("reuses internal buffer on smaller chunks") {
Expand Down Expand Up @@ -275,7 +275,8 @@ class IoPlatformSuite extends Fs2Suite {
bar.assertEquals("bar")
}
test("classloader") {
val size = readClassLoaderResource[IO]("fs2/io/foo", 8192).as(1L).compile.foldMonoid
val resourcePath = if (isNative) "/fs2/io/foo" else "fs2/io/foo"

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This difference surprised me. Does the absolute path not work on the JVM?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Haven't investigated, but I think it's a bug in Scala Native, but since it's test code I preferred to add a if here and eventually when gets fixed upstream you'll see a failing test and it's trivial to remove the if.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

when gets fixed upstream you'll see a failing test

Maybe! But also it could be that both are valid, in which case there wouldn't be a failed test.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right. I just checked, and the leading / doesn't work on the JVM. So it's a bug :)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just opened a PR on Scala Native to fix the bug: scala-native/scala-native#4901

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in #3728

val size = readClassLoaderResource[IO](resourcePath, 8192).as(1L).compile.foldMonoid
size.assertEquals(3L)
}
}
Expand Down
78 changes: 1 addition & 77 deletions io/jvm/src/main/scala/fs2/io/ioplatform.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,11 @@ package fs2
package io

import cats.Show
import cats.effect.kernel.{Async, Outcome, Resource, Sync}
import cats.effect.kernel.{Async, Sync}
import cats.effect.kernel.implicits._
import cats.effect.kernel.Deferred
import cats.syntax.all._
import fs2.internal.ThreadFactories
import fs2.io.internal.PipedStreamBuffer

import java.io.{InputStream, OutputStream}
import java.nio.charset.Charset
import java.nio.charset.StandardCharsets
import java.util.concurrent.Executors
Expand Down Expand Up @@ -94,79 +91,6 @@ private[fs2] trait ioplatform extends iojvmnative {
): Pipe[F, O, Nothing] =
_.map(_.show).through(text.encode(charset)).through(stdout)

/** Pipe that converts a stream of bytes to a stream that will emit a single `java.io.InputStream`,
* that is closed whenever the resulting stream terminates.
*
* If the `close` of resulting input stream is invoked manually, then this will await until the
* original stream completely terminates.
*
* Because all `InputStream` methods block (including `close`), the resulting `InputStream`
* should be consumed on a different thread pool than the one that is backing the effect.
*
* Note that the implementation is not thread safe -- only one thread is allowed at any time
* to operate on the resulting `java.io.InputStream`.
*/
def toInputStream[F[_]: Async]: Pipe[F, Byte, InputStream] =
source => Stream.resource(toInputStreamResource(source))

/** Like [[toInputStream]] but returns a `Resource` rather than a single element stream.
*/
def toInputStreamResource[F[_]: Async](
source: Stream[F, Byte]
): Resource[F, InputStream] =
JavaInputOutputStream.toInputStream(source)

/** Take a function that emits to a `java.io.OutputStream` effectfully,
* and return a stream which, when run, will perform that function and emit
* the bytes recorded in the OutputStream as an fs2.Stream
*
* The stream produced by this will terminate if:
* - `f` returns
* - `f` calls `OutputStream#close`
*
* If none of those happens, the stream will run forever.
*/
def readOutputStream[F[_]: Async](
chunkSize: Int
)(
f: OutputStream => F[Unit]
): Stream[F, Byte] = {
val mkOutput: Resource[F, (OutputStream, InputStream)] =
Resource.make(Sync[F].delay {
val buf = new PipedStreamBuffer(chunkSize)
(buf.outputStream, buf.inputStream)
})(ois =>
Sync[F].blocking {
// Piped(I/O)Stream implementations cant't throw on close, no need to nest the handling here.
ois._2.close()
ois._1.close()
}
)

Stream.resource(mkOutput).flatMap { case (os, is) =>
Stream.eval(Deferred[F, Option[Throwable]]).flatMap { err =>
// We need to close the output stream regardless of how `f` finishes
// to ensure an outstanding blocking read on the input stream completes.
// In such a case, there's a race between completion of the read
// stream and finalization of the write stream, so we capture the error
// that occurs when writing and rethrow it.
val write = f(os).guaranteeCase((outcome: Outcome[F, Throwable, Unit]) =>
Sync[F].blocking(os.close()) *> err
.complete(outcome match {
case Outcome.Errored(t) => Some(t)
case _ => None
})
.void
)
val read = readInputStream(is.pure[F], chunkSize, closeAfterUse = false)
read.concurrently(Stream.eval(write)) ++ Stream.eval(err.get).flatMap {
case None => Stream.empty
case Some(t) => Stream.raiseError[F](t)
}
}
}
}

// Using null instead of Option because null check is faster
private lazy val vtExecutor: ExecutionContext = {
val javaVersion: Int =
Expand Down