2323#include < memory>
2424#include < numeric>
2525#include < optional>
26+ #include < sstream>
2627#include < string>
2728#include < type_traits>
2829#include < unordered_map>
4243#include " arrow/ipc/metadata_internal.h"
4344#include " arrow/ipc/reader_internal.h"
4445#include " arrow/ipc/writer.h"
46+ #include " arrow/pretty_print.h"
4547#include " arrow/record_batch.h"
4648#include " arrow/sparse_tensor.h"
4749#include " arrow/status.h"
@@ -2694,13 +2696,49 @@ Status ValidateFuzzBatch(const RecordBatchWithMetadata& batch) {
26942696
26952697Status CompareFuzzBatches (const RecordBatchWithMetadata& left,
26962698 const RecordBatchWithMetadata& right) {
2699+ if ((left.custom_metadata != nullptr ) != (right.custom_metadata != nullptr ) ||
2700+ (left.custom_metadata && !left.custom_metadata ->Equals (*right.custom_metadata ))) {
2701+ std::stringstream ss;
2702+ auto print_metadata = [&](const RecordBatchWithMetadata& batch) {
2703+ if (batch.custom_metadata ) {
2704+ ss << batch.custom_metadata ->ToString ();
2705+ } else {
2706+ ss << " nullptr" ;
2707+ }
2708+ };
2709+ ss << " Custom metadata unequal: left = " ;
2710+ print_metadata (left);
2711+ ss << " \n right = " ;
2712+ print_metadata (right);
2713+ return Status::Invalid (ss.str ());
2714+ }
2715+
26972716 bool ok = true ;
26982717 if ((left.batch != nullptr ) != (right.batch != nullptr )) {
26992718 ok = false ;
27002719 } else if (left.batch ) {
27012720 ok &= left.batch ->Equals (*right.batch , EqualOptions{}.nans_equal (true ));
27022721 }
2703- return ok ? Status::OK () : Status::Invalid (" Batches unequal" );
2722+ if (ok) {
2723+ return Status::OK ();
2724+ }
2725+ std::stringstream ss;
2726+ PrettyPrintOptions options{};
2727+ options.show_field_metadata = true ;
2728+ options.show_schema_metadata = true ;
2729+ auto print_batch = [&](const RecordBatchWithMetadata& batch) {
2730+ if (batch.batch ) {
2731+ ss << " \n " ;
2732+ ARROW_UNUSED (PrettyPrint (*batch.batch , options, &ss));
2733+ } else {
2734+ ss << " nullptr" ;
2735+ }
2736+ };
2737+ ss << " Batches unequal: left = " ;
2738+ print_batch (left);
2739+ ss << " \n right = " ;
2740+ print_batch (right);
2741+ return Status::Invalid (ss.str ());
27042742}
27052743
27062744Status CompareFuzzBatches (const std::vector<RecordBatchWithMetadata>& left,
@@ -2748,8 +2786,13 @@ Status FuzzIpcFile(const uint8_t* data, int64_t size) {
27482786
27492787 Status final_status;
27502788
2789+ struct IpcReadResult {
2790+ std::shared_ptr<Schema> schema;
2791+ std::vector<RecordBatchWithMetadata> batches = {};
2792+ };
2793+
27512794 // Try to read the IPC file as a stream to compare the results (differential fuzzing)
2752- auto do_stream_read = [&]() -> Result<std::vector<RecordBatchWithMetadata> > {
2795+ auto do_stream_read = [&]() -> Result<IpcReadResult > {
27532796 io::BufferReader buffer_reader (buffer);
27542797 // Skip magic bytes at the beginning
27552798 RETURN_NOT_OK (
@@ -2766,11 +2809,10 @@ Status FuzzIpcFile(const uint8_t* data, int64_t size) {
27662809 }
27672810 batches.push_back (batch);
27682811 }
2769- return batches;
2812+ return IpcReadResult{batch_reader-> schema (), batches} ;
27702813 };
27712814
2772- auto do_file_read =
2773- [&](bool pre_buffer) -> Result<std::vector<RecordBatchWithMetadata>> {
2815+ auto do_file_read = [&](bool pre_buffer) -> Result<IpcReadResult> {
27742816 io::BufferReader buffer_reader (buffer);
27752817 ARROW_ASSIGN_OR_RAISE (auto batch_reader,
27762818 RecordBatchFileReader::Open (&buffer_reader, FuzzingOptions ()));
@@ -2779,46 +2821,66 @@ Status FuzzIpcFile(const uint8_t* data, int64_t size) {
27792821 RETURN_NOT_OK (batch_reader->PreBufferMetadata (/* indices=*/ {}));
27802822 }
27812823
2824+ IpcReadResult result = {.schema = batch_reader->schema ()};
27822825 const int n_batches = batch_reader->num_record_batches ();
2783- std::vector<RecordBatchWithMetadata> batches;
27842826 // Delay error return until the end, as we want to access all record batches
27852827 Status st;
27862828 for (int i = 0 ; i < n_batches; ++i) {
27872829 RecordBatchWithMetadata batch;
27882830 st &= batch_reader->ReadRecordBatchWithCustomMetadata (i).Value (&batch);
27892831 st &= ValidateFuzzBatch (batch);
2790- batches.push_back (batch);
2832+ result. batches .push_back (batch);
27912833 }
27922834 RETURN_NOT_OK (st);
2793- return batches ;
2835+ return result ;
27942836 };
27952837
2796- // Lazily-initialized if the IPC reader succeeds
2797- std::optional<Result<std::vector<RecordBatchWithMetadata>>> maybe_stream_batches;
2838+ // Persistent read result for differential fuzzing.
2839+ std::optional<IpcReadResult> maybe_read_result;
2840+
2841+ auto compare_result = [&](IpcReadResult new_result) {
2842+ // Differential fuzzing: compare results read using different APIs or options
2843+ // with one another.
2844+ if (maybe_read_result.has_value ()) {
2845+ ARROW_CHECK_OK (CompareFuzzBatches (maybe_read_result->batches , new_result.batches ));
2846+ } else {
2847+ maybe_read_result = std::move (new_result);
2848+ }
2849+ };
27982850
27992851 for (const bool pre_buffer : {false , true }) {
2800- auto maybe_file_batches = do_file_read (pre_buffer);
2801- final_status &= maybe_file_batches.status ();
2802- if (maybe_file_batches.ok ()) {
2803- // IPC file read successful: differential fuzzing with IPC stream reader,
2804- // if possible.
2805- // NOTE: some valid IPC files may not be readable as IPC streams,
2806- // for example because of excess spacing between IPC messages.
2807- // A regular IPC file writer would not produce them, but fuzzing might.
2808- if (!maybe_stream_batches.has_value ()) {
2809- maybe_stream_batches = do_stream_read ();
2810- final_status &= maybe_stream_batches->status ();
2811- }
2812- if (maybe_stream_batches->ok ()) {
2852+ auto maybe_file_result = do_file_read (pre_buffer);
2853+ final_status &= maybe_file_result.status ();
2854+ if (maybe_file_result.ok ()) {
2855+ compare_result (*std::move (maybe_file_result));
2856+ }
2857+ }
2858+
2859+ if (maybe_read_result.has_value ()) {
2860+ // IPC file read successful: compare results with IPC stream reader,
2861+ // if possible.
2862+ // NOTE: some valid IPC files may not be readable as IPC streams,
2863+ // for example because of excess spacing between IPC messages.
2864+ // A regular IPC file writer would not produce them, but fuzzing might.
2865+ auto maybe_stream_result = do_stream_read ();
2866+ final_status &= maybe_stream_result.status ();
2867+ if (maybe_stream_result.ok ()) {
2868+ if (maybe_read_result->schema ->Equals (maybe_stream_result->schema ,
2869+ /* check_metadata=*/ true )) {
28132870 // XXX: in some rare cases, an IPC file might read unequal to the enclosed
28142871 // IPC stream, for example if the footer skips some batches or orders the
28152872 // batches differently. We should revisit this if the fuzzer generates such
28162873 // files.
2817- ARROW_CHECK_OK (CompareFuzzBatches (*maybe_file_batches, **maybe_stream_batches));
2874+ compare_result (*maybe_stream_result);
2875+ } else {
2876+ // The fuzzer might have mutated the schema definition that is duplicated
2877+ // in the IPC file footer, in which case the comparison above would fail.
2878+ final_status &= Status::TypeError (
2879+ " Schema mismatch between IPC stream and IPC file footer, skipping "
2880+ " comparison" );
28182881 }
28192882 }
28202883 }
2821-
28222884 return final_status;
28232885}
28242886
0 commit comments