diff --git a/.github/workflows/integration.yml b/.github/workflows/integration.yml index 2ef7f1ca788e..583a7009adc5 100644 --- a/.github/workflows/integration.yml +++ b/.github/workflows/integration.yml @@ -33,6 +33,7 @@ on: - 'integration/**' - 'cpp/**' - 'format/**' + - 'ruby/red-arrow-format/**' pull_request: paths: - '.dockerignore' @@ -43,6 +44,7 @@ on: - 'integration/**' - 'cpp/**' - 'format/**' + - 'ruby/red-arrow-format/**' concurrency: group: ${{ github.repository }}-${{ github.head_ref || github.sha }}-${{ github.workflow }} diff --git a/ci/docker/conda-integration.dockerfile b/ci/docker/conda-integration.dockerfile index b0e5ec966d0f..ac7ca571560d 100644 --- a/ci/docker/conda-integration.dockerfile +++ b/ci/docker/conda-integration.dockerfile @@ -42,6 +42,7 @@ RUN mamba install -q -y \ nodejs=${node} \ yarn=${yarn} \ openjdk=${jdk} \ + ruby \ zstd && \ mamba clean --yes --all --force-pkgs-dirs diff --git a/ci/scripts/integration_arrow.sh b/ci/scripts/integration_arrow.sh index 2ee047c50e14..7e315a60a6a6 100755 --- a/ci/scripts/integration_arrow.sh +++ b/ci/scripts/integration_arrow.sh @@ -24,9 +24,14 @@ build_dir=${2} gold_dir=$arrow_dir/testing/data/arrow-ipc-stream/integration +# For backward compatibility. : "${ARROW_INTEGRATION_CPP:=ON}" +: "${ARCHERY_INTEGRATION_WITH_CPP:=$([ "${ARROW_INTEGRATION_CPP}" = "ON" ] && echo "1" || echo "0")}" +export ARCHERY_INTEGRATION_WITH_CPP +: "${ARCHERY_INTEGRATION_WITH_RUBY:=1}" +export ARCHERY_INTEGRATION_WITH_RUBY -: "${ARCHERY_INTEGRATION_TARGET_IMPLEMENTATIONS:=cpp}" +: "${ARCHERY_INTEGRATION_TARGET_IMPLEMENTATIONS:=cpp,ruby}" export ARCHERY_INTEGRATION_TARGET_IMPLEMENTATIONS . "${arrow_dir}/ci/scripts/util_log.sh" @@ -57,14 +62,11 @@ export PYTHONFAULTHANDLER=1 export GOMEMLIMIT=200MiB export GODEBUG=gctrace=1,clobberfree=1 -ARCHERY_WITH_CPP=$([ "$ARROW_INTEGRATION_CPP" == "ON" ] && echo "1" || echo "0") - # Rust can be enabled by exporting ARCHERY_INTEGRATION_WITH_RUST=1 time archery integration \ --run-c-data \ --run-ipc \ --run-flight \ - --with-cpp="${ARCHERY_WITH_CPP}" \ --gold-dirs="$gold_dir/0.14.1" \ --gold-dirs="$gold_dir/0.17.1" \ --gold-dirs="$gold_dir/1.0.0-bigendian" \ diff --git a/ci/scripts/integration_arrow_build.sh b/ci/scripts/integration_arrow_build.sh index 61ad0ea59e4d..691faf7c2d86 100755 --- a/ci/scripts/integration_arrow_build.sh +++ b/ci/scripts/integration_arrow_build.sh @@ -22,7 +22,10 @@ set -e arrow_dir=${1} build_dir=${2} +# For backward compatibility. : "${ARROW_INTEGRATION_CPP:=ON}" +: "${ARCHERY_INTEGRATION_WITH_CPP:=$([ "${ARROW_INTEGRATION_CPP}" = "ON" ] && echo "1" || echo "0")}" +: "${ARCHERY_INTEGRATION_WITH_RUBY:=1}" . "${arrow_dir}/ci/scripts/util_log.sh" @@ -41,7 +44,7 @@ fi github_actions_group_end github_actions_group_begin "Integration: Build: C++" -if [ "${ARROW_INTEGRATION_CPP}" == "ON" ]; then +if [ "${ARCHERY_INTEGRATION_WITH_CPP}" -gt "0" ]; then "${arrow_dir}/ci/scripts/cpp_build.sh" "${arrow_dir}" "${build_dir}" fi github_actions_group_end @@ -69,3 +72,9 @@ if [ "${ARCHERY_INTEGRATION_WITH_JS}" -gt "0" ]; then cp -a "${arrow_dir}/js" "${build_dir}/js" fi github_actions_group_end + +github_actions_group_begin "Integration: Build: Ruby" +if [ "${ARCHERY_INTEGRATION_WITH_RUBY}" -gt "0" ]; then + rake -C "${arrow_dir}/ruby/red-arrow-format" install +fi +github_actions_group_end diff --git a/dev/archery/archery/cli.py b/dev/archery/archery/cli.py index f08656fa94ed..e70cc3874f22 100644 --- a/dev/archery/archery/cli.py +++ b/dev/archery/archery/cli.py @@ -667,7 +667,8 @@ def _set_default(opt, default): @click.option('--random-seed', type=int, default=12345, help="Seed for PRNG when generating test data") @click.option('--with-cpp', type=bool, default=False, - help='Include C++ in integration tests') + help='Include C++ in integration tests', + envvar="ARCHERY_INTEGRATION_WITH_CPP") @click.option('--with-dotnet', type=bool, default=False, help='Include .NET in integration tests', envvar="ARCHERY_INTEGRATION_WITH_DOTNET") @@ -683,6 +684,9 @@ def _set_default(opt, default): @click.option('--with-nanoarrow', type=bool, default=False, help='Include nanoarrow in integration tests', envvar="ARCHERY_INTEGRATION_WITH_NANOARROW") +@click.option('--with-ruby', type=bool, default=False, + help='Include Ruby in integration tests', + envvar="ARCHERY_INTEGRATION_WITH_RUBY") @click.option('--with-rust', type=bool, default=False, help='Include Rust in integration tests', envvar="ARCHERY_INTEGRATION_WITH_RUST") diff --git a/dev/archery/archery/integration/datagen.py b/dev/archery/archery/integration/datagen.py index 83913dc379f2..6b3b13f51dfc 100644 --- a/dev/archery/archery/integration/datagen.py +++ b/dev/archery/archery/integration/datagen.py @@ -1937,6 +1937,7 @@ def get_generated_json_files(tempdir=None): .skip_tester('Java') .skip_tester('JS') .skip_tester('nanoarrow') + .skip_tester('Ruby') .skip_tester('Rust') .skip_tester('Go'), @@ -1944,6 +1945,7 @@ def get_generated_json_files(tempdir=None): .skip_tester('Java') .skip_tester('JS') .skip_tester('nanoarrow') + .skip_tester('Ruby') .skip_tester('Rust') .skip_tester('Go'), @@ -1993,19 +1995,22 @@ def get_generated_json_files(tempdir=None): .skip_tester('nanoarrow') .skip_tester('Java') # TODO(ARROW-7779) # TODO(https://github.com/apache/arrow/issues/38045) - .skip_format(SKIP_FLIGHT, '.NET'), + .skip_format(SKIP_FLIGHT, '.NET') + .skip_tester('Ruby'), generate_run_end_encoded_case() .skip_tester('.NET') .skip_tester('JS') # TODO(https://github.com/apache/arrow-nanoarrow/issues/618) .skip_tester('nanoarrow') + .skip_tester('Ruby') .skip_tester('Rust'), generate_binary_view_case() .skip_tester('JS') # TODO(https://github.com/apache/arrow-nanoarrow/issues/618) .skip_tester('nanoarrow') + .skip_tester('Ruby') .skip_tester('Rust'), generate_list_view_case() @@ -2013,12 +2018,14 @@ def get_generated_json_files(tempdir=None): .skip_tester('JS') # TODO(https://github.com/apache/arrow-nanoarrow/issues/618) .skip_tester('nanoarrow') + .skip_tester('Ruby') .skip_tester('Rust'), generate_extension_case() .skip_tester('nanoarrow') # TODO(https://github.com/apache/arrow/issues/38045) - .skip_format(SKIP_FLIGHT, '.NET'), + .skip_format(SKIP_FLIGHT, '.NET') + .skip_tester('Ruby'), ] generated_paths = [] diff --git a/dev/archery/archery/integration/runner.py b/dev/archery/archery/integration/runner.py index 29e336422439..9c0fda371e1e 100644 --- a/dev/archery/archery/integration/runner.py +++ b/dev/archery/archery/integration/runner.py @@ -196,9 +196,11 @@ def _gold_tests(self, gold_dir): skip_testers.add(".NET") skip_testers.add("Java") skip_testers.add("JS") + skip_testers.add("Ruby") skip_testers.add("Rust") if prefix == '2.0.0-compression': skip_testers.add("JS") + skip_testers.add("Ruby") if prefix == '2.0.0-compression' and 'lz4' in name: # https://github.com/apache/arrow-nanoarrow/issues/621 skip_testers.add("nanoarrow") @@ -590,9 +592,9 @@ def get_static_json_files(): def select_testers(with_cpp=True, with_java=True, with_js=True, - with_dotnet=True, with_go=True, with_rust=False, - with_nanoarrow=False, target_implementations="", - **kwargs): + with_dotnet=True, with_go=True, with_ruby=False, + with_rust=False, with_nanoarrow=False, + target_implementations="", **kwargs): target_implementations = (target_implementations.split(",") if target_implementations else []) @@ -629,6 +631,10 @@ def append_tester(implementation, tester): from .tester_nanoarrow import NanoarrowTester append_tester("nanoarrow", NanoarrowTester(**kwargs)) + if with_ruby: + from .tester_ruby import RubyTester + append_tester("ruby", RubyTester(**kwargs)) + if with_rust: from .tester_rust import RustTester append_tester("rust", RustTester(**kwargs)) diff --git a/dev/archery/archery/integration/tester_ruby.py b/dev/archery/archery/integration/tester_ruby.py new file mode 100644 index 000000000000..a84ba825cbd4 --- /dev/null +++ b/dev/archery/archery/integration/tester_ruby.py @@ -0,0 +1,78 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import os + +from .tester import Tester +from .util import run_cmd, log +from ..utils.source import ARROW_ROOT_DEFAULT + + +_EXE_PATH = os.path.join( + ARROW_ROOT_DEFAULT, "ruby/red-arrow-format/bin/red-arrow-format-integration-test") + + +class RubyTester(Tester): + PRODUCER = True + CONSUMER = True + + name = "Ruby" + + def _run(self, env): + command_line = [_EXE_PATH] + if self.debug: + command_line_string = "" + for key, value in env.items: + command_line_string += f"{key}={value} " + command_line_string += " ".join(command_line) + log(command_line_string) + run_cmd(command_line, env=os.environ | env) + + def validate(self, json_path, arrow_path, quirks=None): + env = { + "ARROW": arrow_path, + "COMMAND": "validate", + "JSON": json_path, + } + if quirks: + for quirk in quirks: + env[f"QUIRK_{quirk.upper()}"] = "true" + self._run(env) + + def json_to_file(self, json_path, arrow_path): + env = { + "ARROW": arrow_path, + "COMMAND": "json-to-file", + "JSON": json_path, + } + self._run(env) + + def stream_to_file(self, stream_path, file_path): + env = { + "ARROW": file_path, + "ARROWS": stream_path, + "COMMAND": "stream-to-file", + } + self._run(env) + + def file_to_stream(self, file_path, stream_path): + env = { + "ARROW": file_path, + "ARROWS": stream_path, + "COMMAND": "file-to-stream", + } + self._run(env) diff --git a/dev/archery/archery/integration/util.py b/dev/archery/archery/integration/util.py index 1b1eb95a1d29..f2fb5ede20c1 100644 --- a/dev/archery/archery/integration/util.py +++ b/dev/archery/archery/integration/util.py @@ -17,6 +17,7 @@ import contextlib import io +import os import random import socket import subprocess @@ -137,7 +138,13 @@ def run_cmd(cmd, **kwargs): except subprocess.CalledProcessError as e: # this avoids hiding the stdout / stderr of failed processes sio = io.StringIO() - print('Command failed:', " ".join(cmd), file=sio) + command_line_string = '' + env = kwargs.get('env', {}) + for key in env.keys() - os.environ.keys(): + value = env[key] + command_line_string += f'{key}={value} ' + command_line_string += ' '.join(cmd) + print(f'Command failed: {command_line_string}', file=sio) print('With output:', file=sio) print('--------------', file=sio) print(frombytes(e.output), file=sio) diff --git a/ruby/red-arrow-format/bin/red-arrow-format-integration-test b/ruby/red-arrow-format/bin/red-arrow-format-integration-test new file mode 100755 index 000000000000..8571621f3d2e --- /dev/null +++ b/ruby/red-arrow-format/bin/red-arrow-format-integration-test @@ -0,0 +1,64 @@ +#!/usr/bin/env ruby +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +require_relative "../lib/arrow-format" +require_relative "../lib/arrow-format/integration/options" + +options = ArrowFormat::Integration::Options.singleton +case options.command +when "validate" + require_relative "../lib/arrow-format/integration/validate" +when "json-to-file" + require_relative "../lib/arrow-format/integration/json-reader" + File.open(options.json, "r") do |input| + reader = ArrowFormat::Integration::JSONReader.new(input) + File.open(options.arrow, "wb") do |output| + writer = ArrowFormat::FileWriter.new(output) + writer.start(reader.schema) + reader.each do |record_batch| + writer.write_record_batch(record_batch) + end + writer.finish + end + end +when "stream-to-file" + File.open(options.arrows, "rb") do |input| + reader = ArrowFormat::StreamingReader.new(input) + File.open(options.arrow, "wb") do |output| + writer = ArrowFormat::FileWriter.new(output) + writer.start(reader.schema) + reader.each do |record_batch| + writer.write_record_batch(record_batch) + end + writer.finish + end + end +when "file-to-stream" + File.open(options.arrow, "rb") do |input| + reader = ArrowFormat::FileReader.new(input) + File.open(options.arrows, "wb") do |output| + writer = ArrowFormat::StreamingWriter.new(output) + writer.start(reader.schema) + reader.each do |record_batch| + writer.write_record_batch(record_batch) + end + writer.finish + end + end +end diff --git a/ruby/red-arrow-format/lib/arrow-format/array.rb b/ruby/red-arrow-format/lib/arrow-format/array.rb index 951de74475eb..10be36d53009 100644 --- a/ruby/red-arrow-format/lib/arrow-format/array.rb +++ b/ruby/red-arrow-format/lib/arrow-format/array.rb @@ -56,6 +56,10 @@ def n_nulls end end + def empty? + @size.zero? + end + protected def slice!(offset, size) @offset = offset @@ -144,6 +148,10 @@ def each_buffer return to_enum(__method__) unless block_given? end + def n_nulls + @size + end + def to_a [nil] * @size end @@ -156,6 +164,8 @@ def initialize(type, size, validity_buffer, values_buffer) end def to_a + return [] if empty? + offset = element_size * @offset apply_validity(@values_buffer.values(@type.buffer_type, offset, @size)) end @@ -177,6 +187,8 @@ def element_size class BooleanArray < PrimitiveArray def to_a + return [] if empty? + @values_bitmap ||= Bitmap.new(@values_buffer, @offset, @size) values = @values_bitmap.to_a apply_validity(values) @@ -264,6 +276,8 @@ class YearMonthIntervalArray < IntervalArray class DayTimeIntervalArray < IntervalArray def to_a + return [] if empty? + offset = element_size * @offset values = @values_buffer. each(@type.buffer_type, offset, @size * 2). @@ -282,6 +296,8 @@ def element_size class MonthDayNanoIntervalArray < IntervalArray def to_a + return [] if empty? + buffer_types = @type.buffer_types value_size = IO::Buffer.size_of(buffer_types) base_offset = value_size * @offset @@ -301,7 +317,7 @@ def element_size class DurationArray < TemporalArray end - class VariableSizeBinaryLayoutArray < Array + class VariableSizeBinaryArray < Array def initialize(type, size, validity_buffer, offsets_buffer, values_buffer) super(type, size, validity_buffer) @offsets_buffer = offsets_buffer @@ -323,11 +339,18 @@ def each_buffer yield(sliced_values_buffer) end - def to_a - values = @offsets_buffer. + def offsets + return [0] if empty? + + @offsets_buffer. each(@type.offset_buffer_type, offset_size * @offset, @size + 1). - each_cons(2). - collect do |(_, offset), (_, next_offset)| + collect {|_, offset| offset} + end + + def to_a + return [] if empty? + + values = offsets.each_cons(2).collect do |offset, next_offset| length = next_offset - offset @values_buffer.get_string(offset, length, @type.encoding) end @@ -340,16 +363,19 @@ def offset_size end end - class BinaryArray < VariableSizeBinaryLayoutArray + class BinaryArray < VariableSizeBinaryArray + end + + class LargeBinaryArray < VariableSizeBinaryArray end - class LargeBinaryArray < VariableSizeBinaryLayoutArray + class VariableSizeUTF8Array < VariableSizeBinaryArray end - class UTF8Array < VariableSizeBinaryLayoutArray + class UTF8Array < VariableSizeUTF8Array end - class LargeUTF8Array < VariableSizeBinaryLayoutArray + class LargeUTF8Array < VariableSizeUTF8Array end class FixedSizeBinaryArray < Array @@ -368,6 +394,8 @@ def each_buffer end def to_a + return [] if empty? + byte_width = @type.byte_width values = 0.step(@size * byte_width - 1, byte_width).collect do |offset| @values_buffer.get_string(offset, byte_width) @@ -378,6 +406,8 @@ def to_a class DecimalArray < FixedSizeBinaryArray def to_a + return [] if empty? + byte_width = @type.byte_width buffer_types = [:u64] * (byte_width / 8 - 1) + [:s64] base_offset = byte_width * @offset @@ -408,8 +438,8 @@ def format_value(components) elsif @type.scale > 0 n_digits = string.bytesize n_digits -= 1 if value < 0 - if n_digits < @type.scale - prefix = "0." + ("0" * (@type.scale - n_digits - 1)) + if n_digits <= @type.scale + prefix = "0." + ("0" * (@type.scale - n_digits)) if value < 0 string[1, 0] = prefix else @@ -446,12 +476,19 @@ def each_buffer(&block) @type.offset_buffer_type)) end + def offsets + return [0] if empty? + + @offsets_buffer. + each(@type.offset_buffer_type, offset_size * @offset, @size + 1). + collect {|_, offset| offset} + end + def to_a + return [] if empty? + child_values = @child.to_a - values = @offsets_buffer. - each(@type.offset_buffer_type, offset_size * @offset, @size + 1). - each_cons(2). - collect do |(_, offset), (_, next_offset)| + values = offsets.each_cons(2).collect do |offset, next_offset| child_values[offset...next_offset] end apply_validity(values) @@ -494,6 +531,8 @@ def each_buffer(&block) end def to_a + return [] if empty? + values = @child.to_a.each_slice(@type.size).to_a apply_validity(values) end @@ -520,6 +559,8 @@ def each_buffer(&block) end def to_a + return [] if empty? + if @children.empty? values = [[]] * @size else @@ -540,6 +581,8 @@ def slice!(offset, size) class MapArray < VariableSizeListArray def to_a + return [] if empty? + super.collect do |entries| if entries.nil? entries @@ -555,6 +598,7 @@ def to_a end class UnionArray < Array + attr_reader :types_buffer attr_reader :children def initialize(type, size, types_buffer, children) super(type, size, nil) @@ -562,6 +606,18 @@ def initialize(type, size, types_buffer, children) @children = children end + def each_type(&block) + return [].each(&block) if empty? + + return to_enum(__method__) unless block_given? + + @types_buffer.each(type_buffer_type, + type_element_size * @offset, + @size) do |_, type| + yield(type) + end + end + private def type_buffer_type :S8 @@ -590,15 +646,23 @@ def each_buffer(&block) yield(@offsets_buffer) end + def each_offset(&block) + return [].each(&block) if empty? + + return to_enum(__method__) unless block_given? + + @offsets_buffer.each(@type.offset_buffer_type, + offset_element_size * @offset, + @size) do |_, offset| + yield(offset) + end + end + def to_a + return [] if empty? + children_values = @children.collect(&:to_a) - types = @types_buffer.each(type_buffer_type, - type_element_size * @offset, - @size) - offsets = @offsets_buffer.each(:s32, - offset_element_size * @offset, - @size) - types.zip(offsets).collect do |(_, type), (_, offset)| + each_type.zip(each_offset).collect do |type, offset| index = @type.resolve_type_index(type) children_values[index][offset] end @@ -624,10 +688,10 @@ def each_buffer(&block) end def to_a + return [] if empty? + children_values = @children.collect(&:to_a) - @types_buffer.each(type_buffer_type, - type_element_size * @offset, - @size).with_index.collect do |(_, type), i| + each_type.with_index.collect do |type, i| index = @type.resolve_type_index(type) children_values[index][i] end @@ -663,15 +727,19 @@ def each_buffer yield(@indices_buffer) end + def indices + buffer_type = @type.index_type.buffer_type + offset = IO::Buffer.size_of(buffer_type) * @offset + apply_validity(@indices_buffer.values(buffer_type, offset, @size)) + end + def to_a + return [] if empty? + values = [] @dictionaries.each do |dictionary| values.concat(dictionary.to_a) end - buffer_type = @type.index_type.buffer_type - offset = IO::Buffer.size_of(buffer_type) * @offset - indices = - apply_validity(@indices_buffer.values(buffer_type, offset, @size)) indices.collect do |index| if index.nil? nil diff --git a/ruby/red-arrow-format/lib/arrow-format/bitmap.rb b/ruby/red-arrow-format/lib/arrow-format/bitmap.rb index e4a0dc76d368..183c3b28f3eb 100644 --- a/ruby/red-arrow-format/lib/arrow-format/bitmap.rb +++ b/ruby/red-arrow-format/lib/arrow-format/bitmap.rb @@ -36,7 +36,7 @@ def each current = -1 n_bytes = (@offset + @n_values) / 8 @buffer.each(:U8, 0, n_bytes) do |offset, value| - 7.times do |i| + 8.times do |i| current += 1 next if current < @offset yield((value & (1 << (i % 8))) > 0) diff --git a/ruby/red-arrow-format/lib/arrow-format/error.rb b/ruby/red-arrow-format/lib/arrow-format/error.rb index d73c4082beb9..f6aebb364552 100644 --- a/ruby/red-arrow-format/lib/arrow-format/error.rb +++ b/ruby/red-arrow-format/lib/arrow-format/error.rb @@ -25,7 +25,7 @@ class FileReadError < ReadError attr_reader :buffer def initialize(buffer, message) @buffer = buffer - super("#{message}: #{@buffer}") + super("#{message}: #{@buffer.inspect}") end end diff --git a/ruby/red-arrow-format/lib/arrow-format/field.rb b/ruby/red-arrow-format/lib/arrow-format/field.rb index 022091f65123..28a4a90b0901 100644 --- a/ruby/red-arrow-format/lib/arrow-format/field.rb +++ b/ruby/red-arrow-format/lib/arrow-format/field.rb @@ -18,17 +18,14 @@ module ArrowFormat class Field attr_reader :name attr_reader :type - attr_reader :dictionary_id attr_reader :metadata def initialize(name, type, nullable: true, - dictionary_id: nil, metadata: nil) @name = name @type = type @nullable = nullable - @dictionary_id = dictionary_id @metadata = metadata end @@ -41,7 +38,7 @@ def to_flatbuffers fb_field.name = @name fb_field.nullable = @nullable if @type.respond_to?(:build_fb_field) - @type.build_fb_field(fb_field, self) + @type.build_fb_field(fb_field) else fb_field.type = @type.to_flatbuffers end diff --git a/ruby/red-arrow-format/lib/arrow-format/file-reader.rb b/ruby/red-arrow-format/lib/arrow-format/file-reader.rb index ed510ff09cef..7c749e5fbf8e 100644 --- a/ruby/red-arrow-format/lib/arrow-format/file-reader.rb +++ b/ruby/red-arrow-format/lib/arrow-format/file-reader.rb @@ -64,7 +64,7 @@ def read(i) "Not a record batch message: #{i}: " + fb_header.class.name) end - read_record_batch(fb_header, @schema, body) + read_record_batch(fb_message.version, fb_header, @schema, body) end def each @@ -108,11 +108,14 @@ def read_footer def read_block(block, type, i) offset = block.offset - # If we can report property error information, we can use + # If we can report error information correctly, we can use # MessagePullReader here. # # message_pull_reader = MessagePullReader.new do |message, body| - # return read_record_batch(message.header, @schema, body) + # return read_record_batch(message.version, + # message.header, + # @schema, + # body) # end # chunk = @buffer.slice(offset, # MessagePullReader::CONTINUATION_SIZE + @@ -123,12 +126,18 @@ def read_block(block, type, i) continuation_size = CONTINUATION_BUFFER.size continuation = @buffer.slice(offset, continuation_size) - unless continuation == CONTINUATION_BUFFER + if continuation == CONTINUATION_BUFFER + offset += continuation_size + elsif continuation.get_value(MessagePullReader::CONTINUATION_TYPE, 0) < 0 raise FileReadError.new(@buffer, "Invalid continuation: #{type}: #{i}: " + continuation.inspect) + else + # For backward compatibility of data produced prior to version + # 0.15.0. It doesn't have continuation token. Ignore it and + # re-read it as metadata length. + continuation_size = 0 end - offset += continuation_size metadata_length_type = MessagePullReader::METADATA_LENGTH_TYPE metadata_length_size = MessagePullReader::METADATA_LENGTH_SIZE @@ -161,7 +170,7 @@ def read_dictionaries dictionary_fields = {} @schema.fields.each do |field| next unless field.type.is_a?(DictionaryType) - dictionary_fields[field.dictionary_id] = field + dictionary_fields[field.type.id] = field end dictionaries = {} @@ -194,7 +203,10 @@ def read_dictionaries value_type = dictionary_fields[id].type.value_type schema = Schema.new([Field.new("dummy", value_type)]) - record_batch = read_record_batch(fb_header.data, schema, body) + record_batch = read_record_batch(fb_message.version, + fb_header.data, + schema, + body) if fb_header.delta? dictionaries[id] << record_batch.columns[0] else diff --git a/ruby/red-arrow-format/lib/arrow-format/integration/json-reader.rb b/ruby/red-arrow-format/lib/arrow-format/integration/json-reader.rb new file mode 100644 index 000000000000..3660e8af3427 --- /dev/null +++ b/ruby/red-arrow-format/lib/arrow-format/integration/json-reader.rb @@ -0,0 +1,408 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +require "json" + +module ArrowFormat + module Integration + class JSONReader + attr_reader :schema + def initialize(input) + @json = JSON.load(input.read) + @dictionaries = {} + @schema = read_schema + end + + def each + @json["batches"].each do |record_batch| + yield(read_record_batch(record_batch)) + end + end + + private + def read_metadata(metadata) + return nil if metadata.nil? + + metadata.inject({}) do |result, metadatum| + result[metadatum["key"]] = metadatum["value"] + result + end + end + + def read_type(type, children) + case type["name"] + when "null" + NullType.singleton + when "bool" + BooleanType.singleton + when "int" + is_signed = type["isSigned"] + case type["bitWidth"] + when 8 + if is_signed + Int8Type.singleton + else + UInt8Type.singleton + end + when 16 + if is_signed + Int16Type.singleton + else + UInt16Type.singleton + end + when 32 + if is_signed + Int32Type.singleton + else + UInt32Type.singleton + end + when 64 + if is_signed + Int64Type.singleton + else + UInt64Type.singleton + end + else + raise "Unsupported type: #{type.inspect}: #{children.inspect}" + end + when "floatingpoint" + case type["precision"] + when "SINGLE" + Float32Type.singleton + when "DOUBLE" + Float64Type.singleton + else + raise "Unsupported type: #{type.inspect}: #{children.inspect}" + end + when "date" + case type["unit"] + when "DAY" + Date32Type.singleton + when "MILLISECOND" + Date64Type.singleton + else + raise "Unsupported type: #{type.inspect}: #{children.inspect}" + end + when "time" + unit = type["unit"].downcase.to_sym + case type["bitWidth"] + when 32 + Time32Type.new(unit) + when 64 + Time64Type.new(unit) + else + raise "Unsupported type: #{type.inspect}: #{children.inspect}" + end + when "timestamp" + unit = type["unit"].downcase.to_sym + TimestampType.new(unit, type["timezone"]) + when "interval" + case type["unit"] + when "YEAR_MONTH" + YearMonthIntervalType.singleton + when "DAY_TIME" + DayTimeIntervalType.singleton + when "MONTH_DAY_NANO" + MonthDayNanoIntervalType.singleton + else + raise "Unsupported type: #{type.inspect}: #{children.inspect}" + end + when "duration" + DurationType.new(type["unit"].downcase.to_sym) + when "binary" + BinaryType.singleton + when "largebinary" + LargeBinaryType.singleton + when "utf8" + UTF8Type.singleton + when "largeutf8" + LargeUTF8Type.singleton + when "fixedsizebinary" + FixedSizeBinaryType.new(type["byteWidth"]) + when "decimal" + precision = type["precision"] + scale = type["scale"] + case type["bitWidth"] + when 128 + Decimal128Type.new(precision, scale) + when 256 + Decimal256Type.new(precision, scale) + else + raise "Unsupported type: #{type.inspect}: #{children.inspect}" + end + when "list" + ListType.new(read_field(children[0])) + when "largelist" + LargeListType.new(read_field(children[0])) + when "fixedsizelist" + FixedSizeListType.new(read_field(children[0]), type["listSize"]) + when "struct" + StructType.new(children.collect {|child| read_field(child)}) + when "map" + MapType.new(read_field(children[0]), type["keysSorted"]) + when "union" + children = children.collect {|child| read_field(child)} + type_ids = type["typeIds"] + case type["mode"] + when "DENSE" + DenseUnionType.new(children, type_ids) + when "SPARSE" + SparseUnionType.new(children, type_ids) + else + raise "Unsupported type: #{type.inspect}: #{children.inspect}" + end + else + raise "Unsupported type: #{type.inspect}: #{children.inspect}" + end + end + + def read_field(field) + type = read_type(field["type"], field["children"]) + dictionary = field["dictionary"] + if dictionary + index_type = read_type(dictionary["indexType"], []) + value_type = type + type = DictionaryType.new(dictionary["id"], + index_type, + value_type, + dictionary["isOrdered"]) + end + metadata = read_metadata(field["metadata"]) + Field.new(field["name"], + type, + nullable: field["nullable"], + metadata: metadata) + end + + def read_dictionary(id, type) + @json["dictionaries"].each do |dictionary| + next unless dictionary["id"] == id + return read_array(dictionary["data"]["columns"][0], type) + end + end + + def read_schema + fields = [] + @json["schema"]["fields"].each do |field| + fields << read_field(field) + end + metadata = read_metadata(@json["schema"]["metadata"]) + Schema.new(fields, metadata: metadata) + end + + def read_bitmap(bitmap) + buffer = +"".b + bitmap.each_slice(8) do |bits| + byte = 0 + while bits.size < 8 + bits << 0 + end + bits.reverse_each do |bit| + byte = (byte << 1) + bit + end + buffer << [byte].pack("C") + end + IO::Buffer.for(buffer) + end + + def read_types(types) + buffer_type = :S8 + size = IO::Buffer.size_of(buffer_type) + buffer = IO::Buffer.new(size * types.size) + types.each_with_index do |type, i| + offset = size * i + buffer.set_value(buffer_type, offset, type) + end + buffer + end + + def read_offsets(offsets, type) + return nil if offsets.nil? + + case type + when LargeListType, LargeBinaryType, LargeUTF8Type + offsets = offsets.collect {|offset| Integer(offset, 10)} + end + size = IO::Buffer.size_of(type.offset_buffer_type) + buffer = IO::Buffer.new(size * offsets.size) + offsets.each_with_index do |offset, i| + value_offset = size * i + buffer.set_value(type.offset_buffer_type, value_offset, offset) + end + buffer + end + + def read_hex_value(value) + values = value.scan(/.{2}/).collect do |hex| + Integer(hex, 16) + end + values.pack("C*") + end + + def read_values(data, type) + case type + when BooleanType + read_bitmap(data.collect {|boolean| boolean ? 1 : 0}) + when DayTimeIntervalType + buffer_types = [type.buffer_type] * 2 + size = IO::Buffer.size_of(buffer_types) + buffer = IO::Buffer.new(size * data.size) + data.each_with_index do |value, i| + offset = size * i + components = value.fetch_values("days", "milliseconds") + buffer.set_values(buffer_types, offset, components) + end + buffer + when MonthDayNanoIntervalType + size = IO::Buffer.size_of(type.buffer_types) + buffer = IO::Buffer.new(size * data.size) + data.each_with_index do |value, i| + offset = size * i + components = value.fetch_values("months", "days", "nanoseconds") + buffer.set_values(type.buffer_types, offset, components) + end + buffer + when NumberType, + TemporalType + size = IO::Buffer.size_of(type.buffer_type) + buffer = IO::Buffer.new(size * data.size) + data.each_with_index do |value, i| + offset = size * i + # If the type is 64bit such as `Int64Type`, `value` is a + # string not integer to round-trip data through JSON. + value = Integer(value, 10) if value.is_a?(String) + buffer.set_value(type.buffer_type, offset, value) + end + buffer + when DecimalType + byte_width = type.byte_width + bit_width = byte_width * 8 + buffer = IO::Buffer.new(byte_width * data.size) + data.each_with_index do |value, i| + offset = byte_width * i + components = [] + value = BigDecimal(value) + value *= 10 ** value.scale if value.scale > 0 + bits = "%0#{bit_width}b" % value.to_i + if value.negative? + # `bits` starts with "..1". + # + # If `value` is the minimum negative value, `bits` may + # be larger than `bit_width` because of the start `..` + # (2 characters). + bits = bits.delete_prefix("..").rjust(bit_width, "1") + end + bits.scan(/[01]{64}/).reverse_each do |chunk| + buffer.set_value(:u64, offset, Integer(chunk, 2)) + offset += 8 + end + end + buffer + when UTF8Type, LargeUTF8Type + IO::Buffer.for(data.join) + when VariableSizeBinaryType, FixedSizeBinaryType + IO::Buffer.for(data.collect {|value| read_hex_value(value)}.join) + else + raise "Unsupported values: #{data.inspect}: #{type.inspect}" + end + end + + def read_array(column, type) + length = column["count"] + case type + when NullType + type.build_array(length) + when BooleanType, + NumberType, + TemporalType, + FixedSizeBinaryType + validity_buffer = read_bitmap(column["VALIDITY"]) + values_buffer = read_values(column["DATA"], type) + type.build_array(length, validity_buffer, values_buffer) + when VariableSizeBinaryType + validity_buffer = read_bitmap(column["VALIDITY"]) + offsets_buffer = read_offsets(column["OFFSET"], type) + values_buffer = read_values(column["DATA"], type) + type.build_array(length, + validity_buffer, + offsets_buffer, + values_buffer) + when VariableSizeListType + validity_buffer = read_bitmap(column["VALIDITY"]) + offsets_buffer = read_offsets(column["OFFSET"], type) + child = read_array(column["children"][0], type.child.type) + type.build_array(length, + validity_buffer, + offsets_buffer, + child) + when FixedSizeListType + validity_buffer = read_bitmap(column["VALIDITY"]) + child = read_array(column["children"][0], type.child.type) + type.build_array(length, validity_buffer, child) + when StructType + validity_buffer = read_bitmap(column["VALIDITY"]) + children = column["children"] + .zip(type.children) + .collect do |child_column, child_field| + read_array(child_column, child_field.type) + end + type.build_array(length, validity_buffer, children) + when DenseUnionType + types_buffer = read_types(column["TYPE_ID"]) + offsets_buffer = read_offsets(column["OFFSET"], type) + children = column["children"] + .zip(type.children) + .collect do |child_column, child_field| + read_array(child_column, child_field.type) + end + type.build_array(length, + types_buffer, + offsets_buffer, + children) + when SparseUnionType + types_buffer = read_types(column["TYPE_ID"]) + children = column["children"] + .zip(type.children) + .collect do |child_column, child_field| + read_array(child_column, child_field.type) + end + type.build_array(length, types_buffer, children) + when DictionaryType + validity_buffer = read_bitmap(column["VALIDITY"]) + indices_buffer = read_values(column["DATA"], type.index_type) + dictionary = read_dictionary(type.id, type.value_type) + type.build_array(length, + validity_buffer, + indices_buffer, + [dictionary]) + else + raise "Unsupported array: #{column.inspect}: #{field.inspect}" + end + end + + def read_record_batch(record_batch) + n_rows = record_batch["count"] + columns = record_batch["columns"] + .zip(@schema.fields) + .collect do |column, field| + read_array(column, field.type) + end + RecordBatch.new(@schema, n_rows, columns) + end + end + end +end diff --git a/ruby/red-arrow-format/lib/arrow-format/integration/options.rb b/ruby/red-arrow-format/lib/arrow-format/integration/options.rb new file mode 100644 index 000000000000..a12e8a10ca7d --- /dev/null +++ b/ruby/red-arrow-format/lib/arrow-format/integration/options.rb @@ -0,0 +1,54 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +module ArrowFormat + module Integration + class Options + class << self + def singleton + @singleton ||= new + end + end + + attr_reader :command + attr_reader :arrow + attr_reader :arrows + attr_reader :json + def initialize + @command = ENV["COMMAND"] + @arrow = ENV["ARROW"] + @arrows = ENV["ARROWS"] + @json = ENV["JSON"] + @validate_date64 = ENV["QUIRK_NO_DATE64_VALIDATE"] != "true" + @validate_decimal = ENV["QUIRK_NO_DECIMAL_VALIDATE"] != "true" + @validate_time = ENV["QUIRK_NO_TIMES_VALIDATE"] != "true" + end + + def validate_date64? + @validate_date64 + end + + def validate_decimal? + @validate_decimal + end + + def validate_time? + @validate_time + end + end + end +end diff --git a/ruby/red-arrow-format/lib/arrow-format/integration/validate.rb b/ruby/red-arrow-format/lib/arrow-format/integration/validate.rb new file mode 100644 index 000000000000..b1eec2fc5542 --- /dev/null +++ b/ruby/red-arrow-format/lib/arrow-format/integration/validate.rb @@ -0,0 +1,656 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +require "json" +require "test-unit" + +ENV["TEST_UNIT_MAX_DIFF_TARGET_STRING_SIZE"] ||= "1_000_000" + +class ArrowFormatIntegrationTest < Test::Unit::TestCase + module ToHashable + refine ArrowFormat::Type do + def to_h + { + "name" => normalized_name, + } + end + + private + def normalized_name + name.downcase + end + end + + refine ArrowFormat::BooleanType do + private + def normalized_name + "bool" + end + end + + refine ArrowFormat::IntType do + def to_h + super.merge("bitWidth" => @bit_width, + "isSigned" => @signed) + end + + private + def normalized_name + "int" + end + end + + refine ArrowFormat::FloatingPointType do + def to_h + super.merge("precision" => normalized_precision) + end + + private + def normalized_name + "floatingpoint" + end + + def normalized_precision + @precision.to_s.upcase + end + end + + refine ArrowFormat::DateType do + def to_h + super.merge("unit" => normalized_unit) + end + + private + def normalized_name + "date" + end + + def normalized_unit + @unit.to_s.upcase + end + end + + refine ArrowFormat::TimeType do + def to_h + super.merge("bitWidth" => @bit_width, + "unit" => normalized_unit) + end + + private + def normalized_name + "time" + end + + def normalized_unit + @unit.to_s.upcase + end + end + + refine ArrowFormat::TimestampType do + def to_h + hash = super + hash["unit"] = normalized_unit + hash["timezone"] = @time_zone if @time_zone + hash + end + + private + def normalized_name + "timestamp" + end + + def normalized_unit + @unit.to_s.upcase + end + end + + refine ArrowFormat::IntervalType do + def to_h + super.merge("unit" => normalized_unit) + end + + private + def normalized_name + "interval" + end + + def normalized_unit + @unit.to_s.upcase + end + end + + refine ArrowFormat::DurationType do + def to_h + super.merge("unit" => normalized_unit) + end + + private + def normalized_unit + @unit.to_s.upcase + end + end + + refine ArrowFormat::FixedSizeBinaryType do + def to_h + super.merge("byteWidth" => @byte_width) + end + end + + refine ArrowFormat::FixedSizeListType do + def to_h + super.merge("listSize" => @size) + end + end + + refine ArrowFormat::DecimalType do + def to_h + hash = super + hash.delete("byteWidth") + hash["bitWidth"] = @byte_width * 8 + hash["precision"] = @precision + hash["scale"] = @scale + hash + end + + private + def normalized_name + "decimal" + end + end + + refine ArrowFormat::UnionType do + def to_h + super.merge("mode" => normalized_mode, + "typeIds" => normalized_type_ids) + end + + private + def normalized_name + "union" + end + + def normalized_mode + @mode.to_s.upcase + end + + def normalized_type_ids + @type_ids + end + end + + refine ArrowFormat::MapType do + def to_h + super.merge("keysSorted" => @keys_sorted) + end + end + + module MetadataNormalizable + private + def normalized_metadata + metadata_list = @metadata.collect do |key, value| + {"key" => key, "value" => value} + end + metadata_list.sort_by do |metadatum| + metadatum["key"] + end + end + end + + refine ArrowFormat::Field do + import_methods MetadataNormalizable + + def to_h + hash = { + "children" => normalized_children, + "name" => @name, + "nullable" => @nullable, + } + if @type.is_a?(ArrowFormat::DictionaryType) + hash["type"] = @type.value_type.to_h + hash["dictionary"] = { + "id" => @type.id, + "indexType" => @type.index_type.to_h, + "isOrdered" => @type.ordered?, + } + else + hash["type"] = @type.to_h + end + hash["metadata"] = normalized_metadata if @metadata + hash + end + + private + def normalized_children + if @type.respond_to?(:children) + @type.children.collect(&:to_h) + elsif @type.respond_to?(:child) + [@type.child.to_h] + else + [] + end + end + end + + refine ArrowFormat::Schema do + import_methods MetadataNormalizable + + def to_h + hash = { + "fields" => @fields.collect(&:to_h), + } + hash["metadata"] = normalized_metadata if @metadata + hash + end + end + + refine ArrowFormat::Array do + def to_h + hash = { + "count" => size, + } + to_h_data(hash) + hash + end + + private + def to_h_data(hash) + hash["DATA"] = normalized_data + hash["VALIDITY"] = normalized_validity + end + + def normalized_data + to_a + end + + def normalized_validity + if @validity_buffer + validity_bitmap.collect {|valid| valid ? 1 : 0} + else + [1] * @size + end + end + + def stringify_data(data) + data.collect do |value| + if value.nil? + nil + else + value.to_s + end + end + end + + def hexify(value) + value.each_byte.collect {|byte| "%02X" % byte}.join("") + end + + def normalized_children + @children.zip(@type.children).collect do |child, field| + normalize_child(child, field) + end + end + + def normalize_child(child, field) + child.to_h.merge("name" => field.name) + end + end + + refine ArrowFormat::NullArray do + private + def to_h_data(hash) + end + end + + refine ArrowFormat::Int64Array do + private + def normalized_data + stringify_data(super) + end + end + + refine ArrowFormat::UInt64Array do + private + def normalized_data + stringify_data(super) + end + end + + refine ArrowFormat::Float32Array do + private + def normalized_data + super.collect do |value| + if value.nil? + nil + else + value.round(3) + end + end + end + end + + refine ArrowFormat::Date64Array do + private + def normalized_data + stringify_data(super) + end + end + + refine ArrowFormat::Time64Array do + private + def normalized_data + stringify_data(super) + end + end + + refine ArrowFormat::TimestampArray do + private + def normalized_data + stringify_data(super) + end + end + + refine ArrowFormat::DayTimeIntervalArray do + private + def normalized_data + super.collect do |value| + if value.nil? + nil + else + day, time = value + {"days" => day, "milliseconds" => time} + end + end + end + end + + refine ArrowFormat::MonthDayNanoIntervalArray do + private + def normalized_data + super.collect do |value| + if value.nil? + nil + else + month, day, nano = value + {"months" => month, "days" => day, "nanoseconds" => nano} + end + end + end + end + + refine ArrowFormat::MonthDayNanoIntervalArray do + private + def normalized_data + super.collect do |value| + if value.nil? + nil + else + month, day, nano = value + {"months" => month, "days" => day, "nanoseconds" => nano} + end + end + end + end + + refine ArrowFormat::DurationArray do + private + def normalized_data + stringify_data(super) + end + end + + refine ArrowFormat::VariableSizeBinaryArray do + private + def to_h_data(hash) + super(hash) + hash["OFFSET"] = normalized_offsets + end + + def normalized_data + super.collect do |value| + if value.nil? + nil + else + normalize_value(value) + end + end + end + + def normalize_value(value) + hexify(value) + end + + def normalized_offsets + offsets + end + end + + refine ArrowFormat::LargeBinaryArray do + private + def normalized_offsets + offsets.collect(&:to_s) + end + end + + refine ArrowFormat::VariableSizeUTF8Array do + private + def normalize_value(value) + value + end + end + + refine ArrowFormat::LargeUTF8Array do + private + def normalized_offsets + offsets.collect(&:to_s) + end + end + + refine ArrowFormat::FixedSizeBinaryArray do + private + def normalized_data + super.collect do |value| + if value.nil? + nil + else + normalize_value(value) + end + end + end + + def normalize_value(value) + hexify(value) + end + end + + refine ArrowFormat::DecimalArray do + private + def normalize_value(value) + (value * (10 ** (@type.scale))).to_s("f").delete_suffix(".0") + end + end + + refine ArrowFormat::VariableSizeListArray do + private + def to_h_data(hash) + hash["OFFSET"] = normalized_offsets + hash["VALIDITY"] = normalized_validity + hash["children"] = [normalize_child(@child, @type.child)] + end + + def normalized_offsets + offsets + end + end + + refine ArrowFormat::FixedSizeListArray do + private + def to_h_data(hash) + hash["VALIDITY"] = normalized_validity + hash["children"] = [normalize_child(@child, @type.child)] + end + end + + refine ArrowFormat::LargeListArray do + private + def normalized_offsets + offsets.collect(&:to_s) + end + end + + refine ArrowFormat::StructArray do + private + def to_h_data(hash) + hash["VALIDITY"] = normalized_validity + hash["children"] = normalized_children + end + end + + refine ArrowFormat::UnionArray do + private + def to_h_data(hash) + hash["TYPE_ID"] = each_type.to_a + hash["children"] = normalized_children + end + end + + refine ArrowFormat::DenseUnionArray do + private + def to_h_data(hash) + super + hash["OFFSET"] = each_offset.to_a + end + end + + refine ArrowFormat::DictionaryArray do + private + def normalized_data + indices + end + end + + refine ArrowFormat::RecordBatch do + def to_h + { + "columns" => normalized_columns, + "count" => @n_rows, + } + end + + private + def normalized_columns + @schema.fields.zip(@columns).collect do |field, column| + column.to_h.merge("name" => field.name) + end + end + end + end + + using ToHashable + + def setup + @options = ArrowFormat::Integration::Options.singleton + end + + def normalize_field!(field) + metadata = field["metadata"] + if metadata + field["metadata"] = metadata.sort_by do |metadatum| + metadatum["key"] + end + end + + case field["type"]["name"] + when "decimal" + field["type"]["bitWidth"] ||= 128 unless @options.validate_decimal? + when "map" + entries = field["children"][0] + entries["name"] = "entries" + entries["children"][0]["name"] = "key" + entries["children"][1]["name"] = "value" + end + end + + def normalize_schema!(schema) + schema["fields"].each do |field| + normalize_field!(field) + end + end + + def normalize_array!(array, field) + case field["type"]["name"] + when "map" + entries = array["children"][0] + entries["name"] = "entries" + entries["children"][0]["name"] = "key" + entries["children"][1]["name"] = "value" + when "union" + # V4 data has VALIDITY. + array.delete("VALIDITY") + end + + data = array["DATA"] + validity = array["VALIDITY"] + if data and validity + array["DATA"] = data.zip(validity).collect do |value, valid_bit| + if (valid_bit == 1) + value + else + nil + end + end + end + + child_arrays = array["children"] + if child_arrays + child_fields = field["children"] + child_arrays.zip(child_fields) do |child_array, child_field| + normalize_array!(child_array, child_field) + end + end + end + + def normalize_record_batch!(record_batch, schema) + record_batch["columns"].zip(schema["fields"]) do |column, field| + normalize_array!(column, field) + end + end + + def test_validate + expected = JSON.parse(File.read(@options.json)) + expected_schema = expected["schema"] + normalize_schema!(expected_schema) + File.open(@options.arrow, "rb") do |input| + reader = ArrowFormat::FileReader.new(input) + expected_record_batches = [] + actual_record_batches = [] + reader.each.with_index do |record_batch, i| + expected_record_batch = expected["batches"][i] + normalize_record_batch!(expected_record_batch, expected_schema) + expected_record_batches << expected_record_batch + actual_record_batches << record_batch.to_h + end + assert_equal({ + schema: expected_schema, + record_batches: expected_record_batches, + }, + { + schema: reader.schema.to_h, + record_batches: actual_record_batches, + }) + end + end +end diff --git a/ruby/red-arrow-format/lib/arrow-format/readable.rb b/ruby/red-arrow-format/lib/arrow-format/readable.rb index c5c1d5d2b3e3..783b494b68ec 100644 --- a/ruby/red-arrow-format/lib/arrow-format/readable.rb +++ b/ruby/red-arrow-format/lib/arrow-format/readable.rb @@ -42,7 +42,10 @@ def read_schema(fb_schema) metadata: read_custom_metadata(fb_schema.custom_metadata)) end - def read_field(fb_field) + def read_field(fb_field, + map_entries: false, + map_key: false, + map_value: false) fb_type = fb_field.type case fb_type when FB::Null @@ -57,6 +60,8 @@ def read_field(fb_field) type = Float32Type.singleton when FB::Precision::DOUBLE type = Float64Type.singleton + else + raise ReadError.new("Unsupported type: #{fb_type.inspect}") end when FB::Date case fb_type.unit @@ -64,6 +69,8 @@ def read_field(fb_field) type = Date32Type.singleton when FB::DateUnit::MILLISECOND type = Date64Type.singleton + else + raise ReadError.new("Unsupported type: #{fb_type.inspect}") end when FB::Time case fb_type.bit_width @@ -73,6 +80,8 @@ def read_field(fb_field) type = Time32Type.new(:second) when FB::TimeUnit::MILLISECOND type = Time32Type.new(:millisecond) + else + raise ReadError.new("Unsupported type: #{fb_type.inspect}") end when 64 case fb_type.unit @@ -80,6 +89,8 @@ def read_field(fb_field) type = Time64Type.new(:microsecond) when FB::TimeUnit::NANOSECOND type = Time64Type.new(:nanosecond) + else + raise ReadError.new("Unsupported type: #{fb_type.inspect}") end end when FB::Timestamp @@ -93,6 +104,8 @@ def read_field(fb_field) type = DayTimeIntervalType.singleton when FB::IntervalUnit::MONTH_DAY_NANO type = MonthDayNanoIntervalType.singleton + else + raise ReadError.new("Unsupported type: #{fb_type.inspect}") end when FB::Duration unit = fb_type.unit.name.downcase.to_sym @@ -105,7 +118,15 @@ def read_field(fb_field) type = FixedSizeListType.new(read_field(fb_field.children[0]), fb_type.list_size) when FB::Struct - children = fb_field.children.collect {|child| read_field(child)} + if map_entries + fb_children = fb_field.children + children = [ + read_field(fb_children[0], map_key: true), + read_field(fb_children[1], map_value: true), + ] + else + children = fb_field.children.collect {|child| read_field(child)} + end type = StructType.new(children) when FB::Union children = fb_field.children.collect {|child| read_field(child)} @@ -115,9 +136,12 @@ def read_field(fb_field) type = DenseUnionType.new(children, type_ids) when FB::UnionMode::SPARSE type = SparseUnionType.new(children, type_ids) + else + raise ReadError.new("Unsupported type: #{fb_type.inspect}") end when FB::Map - type = MapType.new(read_field(fb_field.children[0])) + type = MapType.new(read_field(fb_field.children[0], map_entries: true), + fb_type.keys_sorted?) when FB::Binary type = BinaryType.singleton when FB::LargeBinary @@ -134,21 +158,42 @@ def read_field(fb_field) type = Decimal128Type.new(fb_type.precision, fb_type.scale) when 256 type = Decimal256Type.new(fb_type.precision, fb_type.scale) + else + raise ReadError.new("Unsupported type: #{fb_type.inspect}") end + else + raise ReadError.new("Unsupported type: #{fb_type.inspect}") end dictionary = fb_field.dictionary if dictionary dictionary_id = dictionary.id index_type = read_type_int(dictionary.index_type) - type = DictionaryType.new(index_type, type, dictionary.ordered?) + value_type = type + type = DictionaryType.new(dictionary_id, + index_type, + value_type, + dictionary.ordered?) + end + + # Map type uses static "entries"/"key"/"value" as field names + # instead of field names in FlatBuffers. It's based on the + # specification: + # + # The names of the child fields may be respectively "entries", + # "key", and "value", but this is not enforced. + if map_entries + name = "entries" + elsif map_key + name = "key" + elsif map_value + name = "value" else - dictionary_id = nil + name = fb_field.name end - Field.new(fb_field.name, + Field.new(name, type, nullable: fb_field.nullable?, - dictionary_id: dictionary_id, metadata: read_custom_metadata(fb_field.custom_metadata)) end @@ -181,17 +226,17 @@ def read_type_int(fb_type) end end - def read_record_batch(fb_record_batch, schema, body) + def read_record_batch(version, fb_record_batch, schema, body) n_rows = fb_record_batch.length nodes = fb_record_batch.nodes buffers = fb_record_batch.buffers columns = schema.fields.collect do |field| - read_column(field, nodes, buffers, body) + read_column(version, field, nodes, buffers, body) end RecordBatch.new(schema, n_rows, columns) end - def read_column(field, nodes, buffers, body) + def read_column(version, field, nodes, buffers, body) node = nodes.shift length = node.length @@ -209,51 +254,63 @@ def read_column(field, nodes, buffers, body) NumberType, TemporalType values_buffer = buffers.shift - values = body.slice(values_buffer.offset, values_buffer.length) + values = body&.slice(values_buffer.offset, values_buffer.length) field.type.build_array(length, validity, values) when VariableSizeBinaryType offsets_buffer = buffers.shift values_buffer = buffers.shift - offsets = body.slice(offsets_buffer.offset, offsets_buffer.length) - values = body.slice(values_buffer.offset, values_buffer.length) + offsets = body&.slice(offsets_buffer.offset, offsets_buffer.length) + values = body&.slice(values_buffer.offset, values_buffer.length) field.type.build_array(length, validity, offsets, values) when FixedSizeBinaryType values_buffer = buffers.shift - values = body.slice(values_buffer.offset, values_buffer.length) + values = body&.slice(values_buffer.offset, values_buffer.length) field.type.build_array(length, validity, values) when VariableSizeListType offsets_buffer = buffers.shift - offsets = body.slice(offsets_buffer.offset, offsets_buffer.length) - child = read_column(field.type.child, nodes, buffers, body) + offsets = body&.slice(offsets_buffer.offset, offsets_buffer.length) + child = read_column(version, field.type.child, nodes, buffers, body) field.type.build_array(length, validity, offsets, child) when FixedSizeListType - child = read_column(field.type.child, nodes, buffers, body) + child = read_column(version, field.type.child, nodes, buffers, body) field.type.build_array(length, validity, child) when StructType children = field.type.children.collect do |child| - read_column(child, nodes, buffers, body) + read_column(version, child, nodes, buffers, body) end field.type.build_array(length, validity, children) when DenseUnionType - # dense union type doesn't have validity. - types = validity + if version == FB::MetadataVersion::V4 + # Dense union type has validity with V4. + types_buffer = buffers.shift + types = body&.slice(types_buffer.offset, types_buffer.length) + else + # Dense union type doesn't have validity. + types = validity + end offsets_buffer = buffers.shift - offsets = body.slice(offsets_buffer.offset, offsets_buffer.length) + offsets = body&.slice(offsets_buffer.offset, offsets_buffer.length) children = field.type.children.collect do |child| - read_column(child, nodes, buffers, body) + read_column(version, child, nodes, buffers, body) end field.type.build_array(length, types, offsets, children) when SparseUnionType - # sparse union type doesn't have validity. - types = validity + if version == FB::MetadataVersion::V4 + # Sparse union type has validity with V4. + types_buffer = buffers.shift + types = body&.slice(types_buffer.offset, types_buffer.length) + else + # Sparse union type doesn't have validity. + types = validity + end children = field.type.children.collect do |child| - read_column(child, nodes, buffers, body) + read_column(version, child, nodes, buffers, body) end field.type.build_array(length, types, children) when DictionaryType indices_buffer = buffers.shift - indices = body.slice(indices_buffer.offset, indices_buffer.length) - dictionaries = find_dictionaries(field.dictionary_id) + indices = body&.slice(indices_buffer.offset, indices_buffer.length) + dictionaries = find_dictionaries(field.type.id) field.type.build_array(length, validity, indices, dictionaries) end end diff --git a/ruby/red-arrow-format/lib/arrow-format/record-batch.rb b/ruby/red-arrow-format/lib/arrow-format/record-batch.rb index a641c87da71e..938f02d762a9 100644 --- a/ruby/red-arrow-format/lib/arrow-format/record-batch.rb +++ b/ruby/red-arrow-format/lib/arrow-format/record-batch.rb @@ -22,6 +22,8 @@ class RecordBatch attr_reader :schema attr_reader :n_rows + alias_method :size, :n_rows + alias_method :length, :n_rows attr_reader :columns def initialize(schema, n_rows, columns) @schema = schema @@ -29,6 +31,10 @@ def initialize(schema, n_rows, columns) @columns = columns end + def empty? + @n_rows.zero? + end + def to_h hash = {} @schema.fields.zip(@columns) do |field, column| diff --git a/ruby/red-arrow-format/lib/arrow-format/streaming-pull-reader.rb b/ruby/red-arrow-format/lib/arrow-format/streaming-pull-reader.rb index 5657ca4a1be4..13e7ad724302 100644 --- a/ruby/red-arrow-format/lib/arrow-format/streaming-pull-reader.rb +++ b/ruby/red-arrow-format/lib/arrow-format/streaming-pull-reader.rb @@ -100,11 +100,23 @@ def consume(chunk) private def consume_initial(target) continuation = target.get_value(CONTINUATION_TYPE, 0) - unless continuation == CONTINUATION_INT32 + if continuation == CONTINUATION_INT32 + @state = :metadata_length + elsif continuation < 0 raise ReadError.new("Invalid continuation token: " + continuation.inspect) + else + # For backward compatibility of data produced prior to version + # 0.15.0. It doesn't have continuation token. Ignore it and + # re-read it as metadata length. + metadata_length = continuation + if metadata_length == 0 + @state = :eos + else + @metadata_length = metadata_length + @state = :metadata + end end - @state = :metadata_length end def consume_metadata_length(target) @@ -204,7 +216,7 @@ def process_schema_message(message, body) @dictionary_fields = {} @schema.fields.each do |field| next unless field.type.is_a?(DictionaryType) - @dictionary_fields[field.dictionary_id] = field + @dictionary_fields[field.type.id] = field end if @dictionaries.size < @dictionary_fields.size @state = :initial_dictionaries @@ -223,7 +235,10 @@ def process_dictionary_batch_message(message, body) field = @dictionary_fields[header.id] value_type = field.type.value_type schema = Schema.new([Field.new("dummy", value_type)]) - record_batch = read_record_batch(header.data, schema, body) + record_batch = read_record_batch(message.version, + header.data, + schema, + body) if header.delta? @dictionaries[header.id] << record_batch.columns[0] else @@ -237,7 +252,7 @@ def find_dictionaries(id) def process_record_batch_message(message, body) header = message.header - @on_read.call(read_record_batch(header, @schema, body)) + @on_read.call(read_record_batch(message.version, header, @schema, body)) end end end diff --git a/ruby/red-arrow-format/lib/arrow-format/streaming-reader.rb b/ruby/red-arrow-format/lib/arrow-format/streaming-reader.rb index f11972c67a20..f81cfe8913a4 100644 --- a/ruby/red-arrow-format/lib/arrow-format/streaming-reader.rb +++ b/ruby/red-arrow-format/lib/arrow-format/streaming-reader.rb @@ -21,29 +21,49 @@ module ArrowFormat class StreamingReader include Enumerable - attr_reader :schema def initialize(input) @input = input - @schema = nil + @on_read = nil + @pull_reader = StreamingPullReader.new do |record_batch| + @on_read.call(record_batch) if @on_read + end + @buffer = "".b + ensure_schema + end + + def schema + @pull_reader.schema end - def each + def each(&block) return to_enum(__method__) unless block_given? - reader = StreamingPullReader.new do |record_batch| - @schema ||= reader.schema - yield(record_batch) + @on_read = block + begin + loop do + break unless consume + end + ensure + @on_read = nil end + end - buffer = "".b - loop do - next_size = reader.next_required_size - break if next_size.zero? + private + def consume + next_size = @pull_reader.next_required_size + return false if next_size.zero? + + next_chunk = @input.read(next_size, @buffer) + return false if next_chunk.nil? - next_chunk = @input.read(next_size, buffer) - break if next_chunk.nil? + @pull_reader.consume(IO::Buffer.for(next_chunk)) + true + end - reader.consume(IO::Buffer.for(next_chunk)) + def ensure_schema + loop do + break unless consume + break if @pull_reader.schema end end end diff --git a/ruby/red-arrow-format/lib/arrow-format/streaming-writer.rb b/ruby/red-arrow-format/lib/arrow-format/streaming-writer.rb index 0621dcbb8938..18eb2dda3abc 100644 --- a/ruby/red-arrow-format/lib/arrow-format/streaming-writer.rb +++ b/ruby/red-arrow-format/lib/arrow-format/streaming-writer.rb @@ -40,9 +40,9 @@ def start(schema) def write_record_batch(record_batch) record_batch.schema.fields.each_with_index do |field, i| - next if field.dictionary_id.nil? + next unless field.type.is_a?(DictionaryType) dictionary_array = record_batch.columns[i] - write_dictionary(field.dictionary_id, dictionary_array) + write_dictionary(field.type.id, dictionary_array) end write_record_batch_based_message(record_batch, diff --git a/ruby/red-arrow-format/lib/arrow-format/type.rb b/ruby/red-arrow-format/lib/arrow-format/type.rb index fb153450b0f6..17674af30c7e 100644 --- a/ruby/red-arrow-format/lib/arrow-format/type.rb +++ b/ruby/red-arrow-format/lib/arrow-format/type.rb @@ -912,7 +912,7 @@ def to_flatbuffers end class MapType < VariableSizeListType - def initialize(child) + def initialize(child, keys_sorted) if child.nullable? raise TypeError.new("Map entry field must not be nullable: " + child.inspect) @@ -930,12 +930,17 @@ def initialize(child) type.children[0].inspect) end super(child) + @keys_sorted = keys_sorted end def name "Map" end + def keys_sorted? + @keys_sorted + end + def offset_buffer_type :s32 # TODO: big endian support end @@ -993,6 +998,10 @@ def name "DenseUnion" end + def offset_buffer_type + :s32 + end + def build_array(size, types_buffer, offsets_buffer, children) DenseUnionArray.new(self, size, types_buffer, offsets_buffer, children) end @@ -1013,10 +1022,12 @@ def build_array(size, types_buffer, children) end class DictionaryType < Type + attr_reader :id attr_reader :index_type attr_reader :value_type - def initialize(index_type, value_type, ordered) + def initialize(id, index_type, value_type, ordered) super() + @id = id @index_type = index_type @value_type = value_type @ordered = ordered @@ -1038,22 +1049,21 @@ def build_array(size, validity_buffer, indices_buffer, dictionaries) dictionaries) end - def build_fb_field(fb_field, field) + def build_fb_field(fb_field) fb_dictionary_encoding = FB::DictionaryEncoding::Data.new - fb_dictionary_encoding.id = field.dictionary_id + fb_dictionary_encoding.id = @id fb_int = FB::Int::Data.new fb_int.bit_width = @index_type.bit_width fb_int.signed = @index_type.signed? fb_dictionary_encoding.index_type = fb_int fb_dictionary_encoding.ordered = @ordered - fb_dictionary_encoding.dictionary_kind = - FB::DictionaryKind::DENSE_ARRAY + fb_dictionary_encoding.dictionary_kind = FB::DictionaryKind::DENSE_ARRAY fb_field.type = @value_type.to_flatbuffers fb_field.dictionary = fb_dictionary_encoding end def to_s - "#{super}" end end diff --git a/ruby/red-arrow-format/red-arrow-format.gemspec b/ruby/red-arrow-format/red-arrow-format.gemspec index 52340b9748f7..9148a21f50d3 100644 --- a/ruby/red-arrow-format/red-arrow-format.gemspec +++ b/ruby/red-arrow-format/red-arrow-format.gemspec @@ -44,9 +44,10 @@ Gem::Specification.new do |spec| spec.files = ["README.md", "Rakefile", "Gemfile", "#{spec.name}.gemspec"] spec.files += ["LICENSE.txt", "NOTICE.txt"] spec.files += Dir.glob("lib/**/*.rb") + spec.files -= Dir.glob("lib/arrow-format/integration/**/*.rb") spec.files += Dir.glob("doc/text/*") - spec.add_runtime_dependency("red-flatbuffers", ">=0.0.6") + spec.add_runtime_dependency("red-flatbuffers", ">=0.0.8") github_url = "https://github.com/apache/arrow" spec.metadata = { diff --git a/ruby/red-arrow-format/test/test-reader.rb b/ruby/red-arrow-format/test/test-reader.rb index 06360f62acdc..d59a93ce184b 100644 --- a/ruby/red-arrow-format/test/test-reader.rb +++ b/ruby/red-arrow-format/test/test-reader.rb @@ -670,7 +670,7 @@ def test_dictionary array = string_array.dictionary_encode type, values = roundtrip(array) assert_equal([ - "Dictionary", + "Dictionary", ["a", "b", "c", nil, "a"], ], [type.to_s, values]) diff --git a/ruby/red-arrow-format/test/test-writer.rb b/ruby/red-arrow-format/test/test-writer.rb index f36a1a252eec..72776f01ab8e 100644 --- a/ruby/red-arrow-format/test/test-writer.rb +++ b/ruby/red-arrow-format/test/test-writer.rb @@ -86,7 +86,8 @@ def convert_type(red_arrow_type) when Arrow::FixedSizeBinaryDataType ArrowFormat::FixedSizeBinaryType.new(red_arrow_type.byte_width) when Arrow::MapDataType - ArrowFormat::MapType.new(convert_field(red_arrow_type.field)) + ArrowFormat::MapType.new(convert_field(red_arrow_type.field), + red_arrow_type.keys_sorted?) when Arrow::ListDataType ArrowFormat::ListType.new(convert_field(red_arrow_type.field)) when Arrow::LargeListDataType @@ -110,10 +111,14 @@ def convert_type(red_arrow_type) end ArrowFormat::SparseUnionType.new(fields, red_arrow_type.type_codes) when Arrow::DictionaryDataType + @dictionary_id ||= 0 + dictionary_id = @dictionary_id + @dictionary_id += 1 index_type = convert_type(red_arrow_type.index_data_type) - type = convert_type(red_arrow_type.value_data_type) - ArrowFormat::DictionaryType.new(index_type, - type, + value_type = convert_type(red_arrow_type.value_data_type) + ArrowFormat::DictionaryType.new(dictionary_id, + index_type, + value_type, red_arrow_type.ordered?) else raise "Unsupported type: #{red_arrow_type.inspect}" @@ -122,17 +127,9 @@ def convert_type(red_arrow_type) def convert_field(red_arrow_field) type = convert_type(red_arrow_field.data_type) - if type.is_a?(ArrowFormat::DictionaryType) - @dictionary_id ||= 0 - dictionary_id = @dictionary_id - @dictionary_id += 1 - else - dictionary_id = nil - end ArrowFormat::Field.new(red_arrow_field.name, type, nullable: red_arrow_field.nullable?, - dictionary_id: dictionary_id, metadata: red_arrow_field.metadata) end @@ -930,13 +927,13 @@ def test_dictionary module WriterDictionaryDeltaTests def build_schema(value_type) index_type = ArrowFormat::Int32Type.singleton + dictionary_id = 1 ordered = false - type = ArrowFormat::DictionaryType.new(index_type, + type = ArrowFormat::DictionaryType.new(dictionary_id, + index_type, value_type, ordered) - field = ArrowFormat::Field.new("value", - type, - dictionary_id: 1) + field = ArrowFormat::Field.new("value", type) ArrowFormat::Schema.new([field]) end diff --git a/ruby/red-arrow/lib/arrow/table-formatter.rb b/ruby/red-arrow/lib/arrow/table-formatter.rb index b93faf09cbd0..b4e257413c9c 100644 --- a/ruby/red-arrow/lib/arrow/table-formatter.rb +++ b/ruby/red-arrow/lib/arrow/table-formatter.rb @@ -80,7 +80,11 @@ def format_value(value, width=0) when nil "%*s" % [width, FORMATTED_NULL] else - "%-*s" % [width, value.to_s] + value = value.to_s + if value.encoding == Encoding::ASCII_8BIT + value = value.each_byte.collect {|byte| "%X" % byte}.join + end + "%-*s" % [width, value] end end