Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
134 changes: 54 additions & 80 deletions src/core/etl/src/Flow/ETL/DataFrame.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

namespace Flow\ETL;

use function Flow\ETL\DSL\{analyze, refs, to_output};
use function Flow\ETL\DSL\{refs, to_output};
use Flow\ETL\DataFrame\GroupedDataFrame;
use Flow\ETL\Dataset\Report;
use Flow\ETL\Exception\{InvalidArgumentException, RuntimeException};
Expand All @@ -20,19 +20,18 @@
use Flow\ETL\Join\{Expression, Join};
use Flow\ETL\Loader\SchemaValidationLoader;
use Flow\ETL\Loader\StreamLoader\Output;
use Flow\ETL\Pipeline\{BatchingByPipeline,
BatchingPipeline,
CachingPipeline,
CollectingPipeline,
ConstrainedPipeline,
GroupByPipeline,
HashJoinPipeline,
LinkedPipeline,
OffsetPipeline,
PartitioningPipeline,
SortingPipeline,
VoidPipeline,
WindowFunctionPipeline};
use Flow\ETL\Processor\{BatchingByProcessor,
BatchingProcessor,
CachingProcessor,
CollectingProcessor,
ConstrainedProcessor,
GroupByProcessor,
HashJoinProcessor,
OffsetProcessor,
PartitioningProcessor,
SortingProcessor,
VoidProcessor,
WindowProcessor};
use Flow\ETL\Row\{EntryReference, Formatter\ASCIISchemaFormatter, Reference, References};
use Flow\ETL\Schema\{Definition, SchemaFormatter};
use Flow\ETL\Schema\Validator\StrictValidator;
Expand All @@ -57,14 +56,13 @@
ScalarFunctionFilterTransformer,
ScalarFunctionTransformer,
SelectEntriesTransformer,
UntilTransformer
};
UntilTransformer};
use Flow\Filesystem\Path\Filter;
use Flow\Types\Type\AutoCaster;

final class DataFrame
{
private FlowContext $context;
private readonly FlowContext $context;

public function __construct(private Pipeline $pipeline, Config|FlowContext $context)
{
Expand All @@ -79,7 +77,7 @@ public function aggregate(AggregatingFunction ...$aggregations) : self
$groupBy = new GroupBy();
$groupBy->aggregate(...$aggregations);

$this->pipeline = new LinkedPipeline(new GroupByPipeline($groupBy, $this->pipeline));
$this->pipeline->add(new GroupByProcessor($groupBy));

return $this;
}
Expand Down Expand Up @@ -108,7 +106,7 @@ public function autoCast() : self
*/
public function batchBy(string|Reference $column, ?int $minSize = null) : self
{
$this->pipeline = new LinkedPipeline(new BatchingByPipeline($this->pipeline, EntryReference::init($column), $minSize));
$this->pipeline->add(new BatchingByProcessor(EntryReference::init($column), $minSize));

return $this;
}
Expand All @@ -132,7 +130,7 @@ public function batchSize(int $size) : self
return $this->collect();
}

$this->pipeline = new LinkedPipeline(new BatchingPipeline($this->pipeline, $size));
$this->pipeline->add(new BatchingProcessor($size));

return $this;
}
Expand Down Expand Up @@ -160,11 +158,11 @@ public function cache(?string $id = null, ?int $cacheBatchSize = null) : self
}

if ($cacheBatchSize) {
$this->pipeline = new LinkedPipeline(new CachingPipeline(new BatchingPipeline($this->pipeline, $cacheBatchSize), $id));
} else {
$this->pipeline = new LinkedPipeline(new CachingPipeline($this->pipeline, $id));
$this->pipeline->add(new BatchingProcessor($cacheBatchSize));
}

$this->pipeline->add(new CachingProcessor($id));

return $this;
}

Expand All @@ -176,7 +174,7 @@ public function cache(?string $id = null, ?int $cacheBatchSize = null) : self
*/
public function collect() : self
{
$this->pipeline = new LinkedPipeline(new CollectingPipeline($this->pipeline));
$this->pipeline->add(new CollectingProcessor());

return $this;
}
Expand Down Expand Up @@ -210,7 +208,7 @@ public function constrain(Constraint $constraint, Constraint ...$constraints) :
{
$constraints = \array_merge([$constraint], $constraints);

$this->pipeline = new LinkedPipeline(new ConstrainedPipeline($this->pipeline, $constraints));
$this->pipeline->add(new ConstrainedProcessor($constraints));

return $this;
}
Expand All @@ -221,11 +219,9 @@ public function constrain(Constraint $constraint, Constraint ...$constraints) :
*/
public function count() : int
{
$clone = clone $this;

$total = 0;

foreach ($clone->pipeline->process($clone->context) as $rows) {
foreach ($this->pipeline->process($this->context) as $rows) {
$total += $rows->count();
}

Expand Down Expand Up @@ -254,12 +250,11 @@ public function crossJoin(self $dataFrame, string $prefix = '') : self
*/
public function display(int $limit = 20, int|bool $truncate = 20, Formatter $formatter = new AsciiTableFormatter()) : string
{
$clone = clone $this;
$clone->limit($limit);
$this->limit($limit);

$output = '';

foreach ($clone->pipeline->process($clone->context) as $rows) {
foreach ($this->pipeline->process($this->context) as $rows) {
$output .= $formatter->format($rows, $truncate);
}

Expand Down Expand Up @@ -327,15 +322,13 @@ public function duplicateRow(mixed $condition, WithEntry ...$entries) : self
*/
public function fetch(?int $limit = null) : Rows
{
$clone = clone $this;

if ($limit !== null) {
$clone->limit($limit);
$this->limit($limit);
}

$rows = new Rows();

foreach ($clone->pipeline->process($clone->context) as $nextRows) {
foreach ($this->pipeline->process($this->context) as $nextRows) {
$rows = $rows->merge($nextRows);
}

Expand All @@ -359,7 +352,7 @@ public function filter(ScalarFunction $function) : self
*/
public function filterPartitions(Filter|ScalarFunction $filter) : self
{
$extractor = $this->pipeline->source();
$extractor = $this->pipeline->extractor();

if (!$extractor instanceof FileExtractor) {
throw new RuntimeException('filterPartitions can be used only with extractors that implement FileExtractor interface');
Expand Down Expand Up @@ -404,8 +397,7 @@ public function filters(array $functions) : self
*/
public function forEach(?callable $callback = null) : void
{
$clone = clone $this;
$clone->run($callback);
$this->run($callback);
}

/**
Expand All @@ -417,9 +409,7 @@ public function forEach(?callable $callback = null) : void
*/
public function get() : \Generator
{
$clone = clone $this;

return $clone->pipeline->process($clone->context);
return $this->pipeline->process($this->context);
}

/**
Expand All @@ -431,9 +421,7 @@ public function get() : \Generator
*/
public function getAsArray() : \Generator
{
$clone = clone $this;

foreach ($clone->pipeline->process($clone->context) as $rows) {
foreach ($this->pipeline->process($this->context) as $rows) {
yield $rows->toArray();
}
}
Expand All @@ -447,9 +435,7 @@ public function getAsArray() : \Generator
*/
public function getEach() : \Generator
{
$clone = clone $this;

foreach ($clone->pipeline->process($clone->context) as $rows) {
foreach ($this->pipeline->process($this->context) as $rows) {
foreach ($rows as $row) {
yield $row;
}
Expand All @@ -465,9 +451,7 @@ public function getEach() : \Generator
*/
public function getEachAsArray() : \Generator
{
$clone = clone $this;

foreach ($clone->pipeline->process($clone->context) as $rows) {
foreach ($this->pipeline->process($this->context) as $rows) {
foreach ($rows as $row) {
yield $row->toArray();
}
Expand All @@ -491,7 +475,7 @@ public function join(self $dataFrame, Expression $on, string|Join $type = Join::
$type = Join::from($type);
}

$this->pipeline = new LinkedPipeline(new HashJoinPipeline($this->pipeline, $dataFrame, $on, $type));
$this->pipeline->add(new HashJoinProcessor($dataFrame, $on, $type));

return $this;
}
Expand Down Expand Up @@ -611,7 +595,7 @@ public function offset(?int $offset) : self
return $this;
}

$this->pipeline = new LinkedPipeline(new OffsetPipeline($this->pipeline, $offset));
$this->pipeline->add(new OffsetProcessor($offset));

return $this;
}
Expand All @@ -633,18 +617,20 @@ public function partitionBy(string|Reference $entry, string|Reference ...$entrie
{
\array_unshift($entries, $entry);

$this->pipeline = new LinkedPipeline(new PartitioningPipeline($this->pipeline, References::init(...$entries)->all()));
$this->pipeline->add(new PartitioningProcessor(References::init(...$entries)->all()));

return $this;
}

public function pivot(Reference $ref) : self
{
if (!$this->pipeline instanceof GroupByPipeline) {
$processor = $this->pipeline->stages()->current()->processor();

if (!$processor instanceof GroupByProcessor) {
throw new RuntimeException('Pivot can be used only after groupBy');
}

$this->pipeline->groupBy->pivot($ref);
$processor->groupBy->pivot($ref);

return $this;
}
Expand All @@ -654,30 +640,26 @@ public function pivot(Reference $ref) : self
*/
public function printRows(?int $limit = 20, int|bool $truncate = 20, Formatter $formatter = new AsciiTableFormatter()) : void
{
$clone = clone $this;

if ($limit !== null) {
$clone->limit($limit);
$this->limit($limit);
}

$clone->load(to_output($truncate, Output::rows, $formatter));
$this->load(to_output($truncate, Output::rows, $formatter));

$clone->run();
$this->run();
}

/**
* @trigger
*/
public function printSchema(?int $limit = 20, SchemaFormatter $formatter = new ASCIISchemaFormatter()) : void
{
$clone = clone $this;

if ($limit !== null) {
$clone->limit($limit);
$this->limit($limit);
}
$clone->load(to_output(false, Output::schema, schemaFormatter: $formatter));
$this->load(to_output(false, Output::schema, schemaFormatter: $formatter));

$clone->run();
$this->run();
}

/**
Expand Down Expand Up @@ -807,17 +789,15 @@ public function rows(Transformer|Transformation $transformer) : self
*/
public function run(?callable $callback = null, bool|Analyze $analyze = false) : ?Report
{
$clone = clone $this;

if ($analyze === false) {
$analyze = $this->context->config->analyze();
}

$collector = new ReportCollector($analyze, $this->context->config->clock());

foreach ($clone->pipeline->process($clone->context) as $rows) {
foreach ($this->pipeline->process($this->context) as $rows) {
if ($callback !== null) {
$callback($rows, $clone->context);
$callback($rows, $this->context);
}

$collector->capture($rows);
Expand Down Expand Up @@ -869,7 +849,7 @@ public function select(string|Reference ...$entries) : self
*/
public function sortBy(Reference ...$entries) : self
{
$this->pipeline = new LinkedPipeline(new SortingPipeline($this->pipeline, refs(...$entries)));
$this->pipeline->add(new SortingProcessor(refs(...$entries)));

return $this;
}
Expand Down Expand Up @@ -920,7 +900,7 @@ public function validate(Schema $schema, ?SchemaValidator $validator = null) : s
*/
public function void() : self
{
$this->pipeline = new VoidPipeline($this->pipeline);
$this->pipeline->add(new VoidProcessor());

return $this;
}
Expand Down Expand Up @@ -978,24 +958,18 @@ public function withEntry(string|Definition $entry, ScalarFunction|WindowFunctio
{
if ($reference instanceof WindowFunction) {
if (\count($reference->window()->partitions())) {
// When there are partitions, use PartitioningPipeline to ensure all data
// from the same partition is grouped together before processing
$this->pipeline = new LinkedPipeline(
new PartitioningPipeline($this->pipeline, $reference->window()->partitions(), $reference->window()->order())
$this->pipeline->add(
new PartitioningProcessor($reference->window()->partitions(), $reference->window()->order())
);
} else {
// When there are no partitions, collect all data and sort if needed
$this->collect();

if (\count($reference->window()->order())) {
$this->sortBy(...$reference->window()->order());
}
}

// Now wrap in WindowFunctionPipeline to apply the window function
$this->pipeline = new LinkedPipeline(
new WindowFunctionPipeline($this->pipeline, $entry, $reference)
);
$this->pipeline->add(new WindowProcessor($entry, $reference));
} else {
$this->with(new ScalarFunctionTransformer($entry, $reference));
}
Expand Down
8 changes: 4 additions & 4 deletions src/core/etl/src/Flow/ETL/DataFrame/GroupedDataFrame.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

use Flow\ETL\{DataFrame, GroupBy};
use Flow\ETL\Function\AggregatingFunction;
use Flow\ETL\Pipeline\{GroupByPipeline, LinkedPipeline};
use Flow\ETL\Processor\GroupByProcessor;
use Flow\ETL\Row\Reference;

final readonly class GroupedDataFrame
Expand All @@ -19,14 +19,14 @@ public function aggregate(AggregatingFunction ...$aggregations) : DataFrame
{
$this->groupBy->aggregate(...$aggregations);

$pipelineSetter = function (GroupBy $groupBy) : void {
$pipelineAdder = function (GroupBy $groupBy) : void {
/**
* @phpstan-ignore-next-line
*/
$this->pipeline = new LinkedPipeline(new GroupByPipeline($groupBy, $this->pipeline));
$this->pipeline->add(new GroupByProcessor($groupBy));
};

$pipelineSetter->bindTo($this->df, $this->df)($this->groupBy);
$pipelineAdder->bindTo($this->df, $this->df)($this->groupBy);

return $this->df;
}
Expand Down
Loading
Loading