diff --git a/build.sbt b/build.sbt index aec1ff0c38..282d769b16 100644 --- a/build.sbt +++ b/build.sbt @@ -2,6 +2,9 @@ import com.typesafe.tools.mima.core._ Global / onChangedBuildSource := ReloadOnSourceChanges +// Workaround for https://github.com/scala-native/scala-native/issues/2024 +Global / concurrentRestrictions += Tags.limit(NativeTags.Link, 1) + ThisBuild / tlBaseVersion := "3.13" ThisBuild / organization := "co.fs2" @@ -378,7 +381,8 @@ lazy val root = tlCrossRootProject lazy val commonNativeSettings = Seq[Setting[?]]( tlVersionIntroduced := List("2.12", "2.13", "3").map(_ -> "3.13.0").toMap, - Test / nativeBrewFormulas += "openssl" + Test / nativeBrewFormulas += "openssl", + Test / nativeConfig ~= { _.withEmbedResources(true) } ) lazy val core = crossProject(JVMPlatform, JSPlatform, NativePlatform) diff --git a/io/jvm/src/main/scala/fs2/io/JavaInputOutputStream.scala b/io/jvm-native/src/main/scala/fs2/io/JavaInputOutputStream.scala similarity index 100% rename from io/jvm/src/main/scala/fs2/io/JavaInputOutputStream.scala rename to io/jvm-native/src/main/scala/fs2/io/JavaInputOutputStream.scala diff --git a/io/jvm/src/main/scala/fs2/io/internal/PipedStreamBuffer.scala b/io/jvm-native/src/main/scala/fs2/io/internal/PipedStreamBuffer.scala similarity index 100% rename from io/jvm/src/main/scala/fs2/io/internal/PipedStreamBuffer.scala rename to io/jvm-native/src/main/scala/fs2/io/internal/PipedStreamBuffer.scala diff --git a/io/jvm/src/main/scala/fs2/io/internal/Synchronizer.scala b/io/jvm-native/src/main/scala/fs2/io/internal/Synchronizer.scala similarity index 100% rename from io/jvm/src/main/scala/fs2/io/internal/Synchronizer.scala rename to io/jvm-native/src/main/scala/fs2/io/internal/Synchronizer.scala diff --git a/io/jvm-native/src/main/scala/fs2/io/iojvmnative.scala b/io/jvm-native/src/main/scala/fs2/io/iojvmnative.scala index f6ac066201..d5b22a8ab8 100644 --- a/io/jvm-native/src/main/scala/fs2/io/iojvmnative.scala +++ b/io/jvm-native/src/main/scala/fs2/io/iojvmnative.scala @@ -22,9 +22,12 @@ package fs2 package io -import cats.effect.kernel.Sync +import cats.effect.kernel.{Async, Deferred, Outcome, Resource, Sync} +import cats.effect.kernel.implicits._ import cats.syntax.all._ +import fs2.io.internal.PipedStreamBuffer +import java.io.{InputStream, OutputStream} import scala.reflect.ClassTag private[fs2] trait iojvmnative { @@ -56,4 +59,77 @@ private[fs2] trait iojvmnative { case None => Stream.raiseError(new IOException(s"Resource $name not found")) } + /** Take a function that emits to a `java.io.OutputStream` effectfully, + * and return a stream which, when run, will perform that function and emit + * the bytes recorded in the OutputStream as an fs2.Stream + * + * The stream produced by this will terminate if: + * - `f` returns + * - `f` calls `OutputStream#close` + * + * If none of those happens, the stream will run forever. + */ + def readOutputStream[F[_]: Async]( + chunkSize: Int + )( + f: OutputStream => F[Unit] + ): Stream[F, Byte] = { + val mkOutput: Resource[F, (OutputStream, InputStream)] = + Resource.make(Sync[F].delay { + val buf = new PipedStreamBuffer(chunkSize) + (buf.outputStream, buf.inputStream) + })(ois => + Sync[F].blocking { + // Piped(I/O)Stream implementations cant't throw on close, no need to nest the handling here. + ois._2.close() + ois._1.close() + } + ) + + Stream.resource(mkOutput).flatMap { case (os, is) => + Stream.eval(Deferred[F, Option[Throwable]]).flatMap { err => + // We need to close the output stream regardless of how `f` finishes + // to ensure an outstanding blocking read on the input stream completes. + // In such a case, there's a race between completion of the read + // stream and finalization of the write stream, so we capture the error + // that occurs when writing and rethrow it. + val write = f(os).guaranteeCase((outcome: Outcome[F, Throwable, Unit]) => + Sync[F].blocking(os.close()) *> err + .complete(outcome match { + case Outcome.Errored(t) => Some(t) + case _ => None + }) + .void + ) + val read = readInputStream(is.pure[F], chunkSize, closeAfterUse = false) + read.concurrently(Stream.eval(write)) ++ Stream.eval(err.get).flatMap { + case None => Stream.empty + case Some(t) => Stream.raiseError[F](t) + } + } + } + } + + /** Pipe that converts a stream of bytes to a stream that will emit a single `java.io.InputStream`, + * that is closed whenever the resulting stream terminates. + * + * If the `close` of resulting input stream is invoked manually, then this will await until the + * original stream completely terminates. + * + * Because all `InputStream` methods block (including `close`), the resulting `InputStream` + * should be consumed on a different thread pool than the one that is backing the effect. + * + * Note that the implementation is not thread safe -- only one thread is allowed at any time + * to operate on the resulting `java.io.InputStream`. + */ + def toInputStream[F[_]: Async]: Pipe[F, Byte, InputStream] = + source => Stream.resource(toInputStreamResource(source)) + + /** Like [[toInputStream]] but returns a `Resource` rather than a single element stream. + */ + def toInputStreamResource[F[_]: Async]( + source: Stream[F, Byte] + ): Resource[F, InputStream] = + JavaInputOutputStream.toInputStream(source) + } diff --git a/io/jvm/src/test/scala/fs2/io/IoPlatformSuite.scala b/io/jvm-native/src/test/scala/fs2/io/IoPlatformSuite.scala similarity index 98% rename from io/jvm/src/test/scala/fs2/io/IoPlatformSuite.scala rename to io/jvm-native/src/test/scala/fs2/io/IoPlatformSuite.scala index 983832e95d..8213451624 100644 --- a/io/jvm/src/test/scala/fs2/io/IoPlatformSuite.scala +++ b/io/jvm-native/src/test/scala/fs2/io/IoPlatformSuite.scala @@ -38,7 +38,7 @@ import java.util.concurrent.Executors class IoPlatformSuite extends Fs2Suite { // This suite runs for a long time, this avoids timeouts in CI. - override def munitIOTimeout: Duration = 2.minutes + override def munitIOTimeout: Duration = 5.minutes group("readInputStream") { test("reuses internal buffer on smaller chunks") { @@ -275,7 +275,8 @@ class IoPlatformSuite extends Fs2Suite { bar.assertEquals("bar") } test("classloader") { - val size = readClassLoaderResource[IO]("fs2/io/foo", 8192).as(1L).compile.foldMonoid + val resourcePath = if (isNative) "/fs2/io/foo" else "fs2/io/foo" + val size = readClassLoaderResource[IO](resourcePath, 8192).as(1L).compile.foldMonoid size.assertEquals(3L) } } diff --git a/io/jvm/src/main/scala/fs2/io/ioplatform.scala b/io/jvm/src/main/scala/fs2/io/ioplatform.scala index bcfcb8c961..35c4774264 100644 --- a/io/jvm/src/main/scala/fs2/io/ioplatform.scala +++ b/io/jvm/src/main/scala/fs2/io/ioplatform.scala @@ -23,14 +23,11 @@ package fs2 package io import cats.Show -import cats.effect.kernel.{Async, Outcome, Resource, Sync} +import cats.effect.kernel.{Async, Sync} import cats.effect.kernel.implicits._ -import cats.effect.kernel.Deferred import cats.syntax.all._ import fs2.internal.ThreadFactories -import fs2.io.internal.PipedStreamBuffer -import java.io.{InputStream, OutputStream} import java.nio.charset.Charset import java.nio.charset.StandardCharsets import java.util.concurrent.Executors @@ -94,79 +91,6 @@ private[fs2] trait ioplatform extends iojvmnative { ): Pipe[F, O, Nothing] = _.map(_.show).through(text.encode(charset)).through(stdout) - /** Pipe that converts a stream of bytes to a stream that will emit a single `java.io.InputStream`, - * that is closed whenever the resulting stream terminates. - * - * If the `close` of resulting input stream is invoked manually, then this will await until the - * original stream completely terminates. - * - * Because all `InputStream` methods block (including `close`), the resulting `InputStream` - * should be consumed on a different thread pool than the one that is backing the effect. - * - * Note that the implementation is not thread safe -- only one thread is allowed at any time - * to operate on the resulting `java.io.InputStream`. - */ - def toInputStream[F[_]: Async]: Pipe[F, Byte, InputStream] = - source => Stream.resource(toInputStreamResource(source)) - - /** Like [[toInputStream]] but returns a `Resource` rather than a single element stream. - */ - def toInputStreamResource[F[_]: Async]( - source: Stream[F, Byte] - ): Resource[F, InputStream] = - JavaInputOutputStream.toInputStream(source) - - /** Take a function that emits to a `java.io.OutputStream` effectfully, - * and return a stream which, when run, will perform that function and emit - * the bytes recorded in the OutputStream as an fs2.Stream - * - * The stream produced by this will terminate if: - * - `f` returns - * - `f` calls `OutputStream#close` - * - * If none of those happens, the stream will run forever. - */ - def readOutputStream[F[_]: Async]( - chunkSize: Int - )( - f: OutputStream => F[Unit] - ): Stream[F, Byte] = { - val mkOutput: Resource[F, (OutputStream, InputStream)] = - Resource.make(Sync[F].delay { - val buf = new PipedStreamBuffer(chunkSize) - (buf.outputStream, buf.inputStream) - })(ois => - Sync[F].blocking { - // Piped(I/O)Stream implementations cant't throw on close, no need to nest the handling here. - ois._2.close() - ois._1.close() - } - ) - - Stream.resource(mkOutput).flatMap { case (os, is) => - Stream.eval(Deferred[F, Option[Throwable]]).flatMap { err => - // We need to close the output stream regardless of how `f` finishes - // to ensure an outstanding blocking read on the input stream completes. - // In such a case, there's a race between completion of the read - // stream and finalization of the write stream, so we capture the error - // that occurs when writing and rethrow it. - val write = f(os).guaranteeCase((outcome: Outcome[F, Throwable, Unit]) => - Sync[F].blocking(os.close()) *> err - .complete(outcome match { - case Outcome.Errored(t) => Some(t) - case _ => None - }) - .void - ) - val read = readInputStream(is.pure[F], chunkSize, closeAfterUse = false) - read.concurrently(Stream.eval(write)) ++ Stream.eval(err.get).flatMap { - case None => Stream.empty - case Some(t) => Stream.raiseError[F](t) - } - } - } - } - // Using null instead of Option because null check is faster private lazy val vtExecutor: ExecutionContext = { val javaVersion: Int =