2424
2525#include " iceberg/expression/binder.h"
2626#include " iceberg/expression/expression.h"
27+ #include " iceberg/expression/residual_evaluator.h"
2728#include " iceberg/file_reader.h"
2829#include " iceberg/manifest/manifest_entry.h"
2930#include " iceberg/manifest/manifest_group.h"
3031#include " iceberg/result.h"
3132#include " iceberg/schema.h"
3233#include " iceberg/snapshot.h"
3334#include " iceberg/table_metadata.h"
35+ #include " iceberg/util/content_file_util.h"
3436#include " iceberg/util/macros.h"
3537#include " iceberg/util/snapshot_util_internal.h"
3638#include " iceberg/util/timepoint.h"
@@ -294,6 +296,24 @@ Result<ArrowArrayStream> FileScanTask::ToArrow(
294296 return MakeArrowArrayStream (std::move (reader));
295297}
296298
299+ // ChangelogScanTask implementation
300+
301+ int64_t ChangelogScanTask::size_bytes () const {
302+ int64_t total_size = data_file_->file_size_in_bytes ;
303+ for (const auto & delete_file : delete_files_) {
304+ total_size +=
305+ (delete_file->IsDeletionVector () ? delete_file->content_size_in_bytes .value_or (0 )
306+ : delete_file->file_size_in_bytes );
307+ }
308+ return total_size;
309+ }
310+
311+ int32_t ChangelogScanTask::files_count () const { return 1 + delete_files_.size (); }
312+
313+ int64_t ChangelogScanTask::estimated_row_count () const {
314+ return data_file_->record_count ;
315+ }
316+
297317// Generic template implementation for Make
298318template <typename ScanType>
299319Result<std::unique_ptr<TableScanBuilder<ScanType>>> TableScanBuilder<ScanType>::Make(
@@ -747,11 +767,13 @@ Result<std::vector<std::shared_ptr<FileScanTask>>> IncrementalAppendScan::PlanFi
747767// IncrementalChangelogScan implementation
748768
749769Result<std::unique_ptr<IncrementalChangelogScan>> IncrementalChangelogScan::Make (
750- [[maybe_unused]] std::shared_ptr<TableMetadata> metadata,
751- [[maybe_unused]] std::shared_ptr<Schema> schema,
752- [[maybe_unused]] std::shared_ptr<FileIO> io,
753- [[maybe_unused]] internal::TableScanContext context) {
754- return NotImplemented (" IncrementalChangelogScan is not implemented" );
770+ std::shared_ptr<TableMetadata> metadata, std::shared_ptr<Schema> schema,
771+ std::shared_ptr<FileIO> io, internal::TableScanContext context) {
772+ ICEBERG_PRECHECK (metadata != nullptr , " Table metadata cannot be null" );
773+ ICEBERG_PRECHECK (schema != nullptr , " Schema cannot be null" );
774+ ICEBERG_PRECHECK (io != nullptr , " FileIO cannot be null" );
775+ return std::unique_ptr<IncrementalChangelogScan>(new IncrementalChangelogScan (
776+ std::move (metadata), std::move (schema), std::move (io), std::move (context)));
755777}
756778
757779Result<std::vector<std::shared_ptr<ChangelogScanTask>>>
@@ -762,7 +784,123 @@ IncrementalChangelogScan::PlanFiles() const {
762784Result<std::vector<std::shared_ptr<ChangelogScanTask>>>
763785IncrementalChangelogScan::PlanFiles (std::optional<int64_t > from_snapshot_id_exclusive,
764786 int64_t to_snapshot_id_inclusive) const {
765- return NotImplemented (" IncrementalChangelogScan::PlanFiles is not implemented" );
787+ ICEBERG_ASSIGN_OR_RAISE (
788+ auto ancestors_snapshots,
789+ SnapshotUtil::AncestorsBetween (*metadata_, to_snapshot_id_inclusive,
790+ from_snapshot_id_exclusive));
791+
792+ std::vector<std::pair<std::shared_ptr<Snapshot>, std::unique_ptr<SnapshotCache>>>
793+ changelog_snapshots;
794+
795+ for (const auto & snapshot : std::ranges::reverse_view (ancestors_snapshots)) {
796+ auto operation = snapshot->Operation ();
797+ if (!operation.has_value () || operation.value () != DataOperation::kReplace ) {
798+ auto snapshot_cache = std::make_unique<SnapshotCache>(snapshot.get ());
799+ ICEBERG_ASSIGN_OR_RAISE (auto delete_manifests,
800+ snapshot_cache->DeleteManifests (io_));
801+ if (!delete_manifests.empty ()) {
802+ return NotSupported (
803+ " Delete files are currently not supported in changelog scans" );
804+ }
805+ changelog_snapshots.emplace_back (snapshot, std::move (snapshot_cache));
806+ }
807+ }
808+ if (changelog_snapshots.empty ()) {
809+ return std::vector<std::shared_ptr<ChangelogScanTask>>{};
810+ }
811+
812+ std::unordered_set<int64_t > snapshot_ids;
813+ std::unordered_map<int64_t , int32_t > snapshot_ordinals;
814+ int32_t ordinal = 0 ;
815+ for (const auto & snapshot : changelog_snapshots) {
816+ snapshot_ids.insert (snapshot.first ->snapshot_id );
817+ snapshot_ordinals[snapshot.first ->snapshot_id ] = ordinal++;
818+ }
819+
820+ std::vector<ManifestFile> data_manifests;
821+ for (const auto & snapshot : changelog_snapshots) {
822+ ICEBERG_ASSIGN_OR_RAISE (auto manifests, snapshot.second ->DataManifests (io_));
823+ std::ranges::copy_if (manifests, std::back_inserter (data_manifests),
824+ [&snapshot_ids](const ManifestFile& manifest) {
825+ return snapshot_ids.contains (manifest.added_snapshot_id );
826+ });
827+ }
828+ if (data_manifests.empty ()) {
829+ return std::vector<std::shared_ptr<ChangelogScanTask>>{};
830+ }
831+
832+ TableMetadataCache metadata_cache (metadata_.get ());
833+ ICEBERG_ASSIGN_OR_RAISE (auto specs_by_id, metadata_cache.GetPartitionSpecsById ());
834+
835+ ICEBERG_ASSIGN_OR_RAISE (
836+ auto manifest_group,
837+ ManifestGroup::Make (io_, schema_, specs_by_id, std::move (data_manifests), {}));
838+
839+ manifest_group->CaseSensitive (context_.case_sensitive )
840+ .Select (ScanColumns ())
841+ .FilterData (filter ())
842+ .FilterManifestEntries ([&snapshot_ids](const ManifestEntry& entry) {
843+ return entry.snapshot_id .has_value () &&
844+ snapshot_ids.contains (entry.snapshot_id .value ());
845+ })
846+ .IgnoreExisting ()
847+ .ColumnsToKeepStats (context_.columns_to_keep_stats );
848+
849+ if (context_.ignore_residuals ) {
850+ manifest_group->IgnoreResiduals ();
851+ }
852+
853+ auto create_tasks_func =
854+ [&snapshot_ordinals](
855+ std::vector<ManifestEntry>&& entries,
856+ const TaskContext& ctx) -> Result<std::vector<std::shared_ptr<ScanTask>>> {
857+ std::vector<std::shared_ptr<ScanTask>> tasks;
858+ tasks.reserve (entries.size ());
859+
860+ for (auto & entry : entries) {
861+ if (!entry.snapshot_id .has_value () || entry.data_file == nullptr ) {
862+ continue ;
863+ }
864+
865+ int64_t commit_snapshot_id = entry.snapshot_id .value ();
866+ auto ordinal_it = snapshot_ordinals.find (commit_snapshot_id);
867+ if (ordinal_it == snapshot_ordinals.end ()) {
868+ continue ;
869+ }
870+ int32_t change_ordinal = ordinal_it->second ;
871+
872+ if (ctx.drop_stats ) {
873+ ContentFileUtil::DropAllStats (*entry.data_file );
874+ } else if (!ctx.columns_to_keep_stats .empty ()) {
875+ ContentFileUtil::DropUnselectedStats (*entry.data_file , ctx.columns_to_keep_stats );
876+ }
877+
878+ ICEBERG_ASSIGN_OR_RAISE (auto residual,
879+ ctx.residuals ->ResidualFor (entry.data_file ->partition ));
880+
881+ switch (entry.status ) {
882+ case ManifestStatus::kAdded :
883+ tasks.push_back (std::make_shared<AddedRowsScanTask>(
884+ change_ordinal, commit_snapshot_id, std::move (entry.data_file ),
885+ std::vector<std::shared_ptr<DataFile>>{}, std::move (residual)));
886+ break ;
887+ case ManifestStatus::kDeleted :
888+ tasks.push_back (std::make_shared<DeletedDataFileScanTask>(
889+ change_ordinal, commit_snapshot_id, std::move (entry.data_file ),
890+ std::vector<std::shared_ptr<DataFile>>{}, std::move (residual)));
891+ break ;
892+ case ManifestStatus::kExisting :
893+ return InvalidArgument (" Unexpected entry status: EXISTING" );
894+ }
895+ }
896+ return tasks;
897+ };
898+
899+ ICEBERG_ASSIGN_OR_RAISE (auto tasks, manifest_group->Plan (create_tasks_func));
900+ return tasks | std::views::transform ([](const auto & task) {
901+ return std::static_pointer_cast<ChangelogScanTask>(task);
902+ }) |
903+ std::ranges::to<std::vector>();
766904}
767905
768906} // namespace iceberg
0 commit comments