Skip to content

aws: compression: arrow: Set up proper block size#11640

Merged
edsiper merged 1 commit intomasterfrom
cosmo0920-handle-block-size-on-parse_json
Mar 31, 2026
Merged

aws: compression: arrow: Set up proper block size#11640
edsiper merged 1 commit intomasterfrom
cosmo0920-handle-block-size-on-parse_json

Conversation

@cosmo0920
Copy link
Copy Markdown
Contributor

@cosmo0920 cosmo0920 commented Mar 31, 2026

Closes #11578.


Enter [N/A] in the box, if an item is not applicable to your change.

Testing
Before we can approve your change; please submit the following in a comment:

  • Example configuration file for the change
service:
  parsers_file: ../conf/parsers.conf
pipeline:
  inputs:
    - name: tail
      path: ./data/*.log
      tag: logs
      parser: json
      buffer_chunk_size: 2M
      buffer_max_size: 4M
      skip_long_lines: off
      read_from_head: On

  outputs:
    - name: s3
      match: "*"
      bucket: <my-bucket>
      region: <my-region>
      use_put_object: true
      compression: parquet
      s3_key_format: /$TAG/%Y-%m-%d/$UUID.parquet
  • Debug log output from testing the change
Fluent Bit v5.0.2
* Copyright (C) 2015-2026 The Fluent Bit Authors
* Fluent Bit is a CNCF graduated project under the Fluent organization
* https://fluentbit.io

______ _                  _    ______ _ _           _____  _____ 
|  ___| |                | |   | ___ (_) |         |  ___||  _  |
| |_  | |_   _  ___ _ __ | |_  | |_/ /_| |_  __   _|___ \ | |/' |
|  _| | | | | |/ _ \ '_ \| __| | ___ \ | __| \ \ / /   \ \|  /| |
| |   | | |_| |  __/ | | | |_  | |_/ / | |_   \ V //\__/ /\ |_/ /
\_|   |_|\__,_|\___|_| |_|\__| \____/|_|\__|   \_/ \____(_)\___/


[2026/03/31 17:43:56.530] [ info] Configuration:
[2026/03/31 17:43:56.530] [ info]  flush time     | 1.000000 seconds
[2026/03/31 17:43:56.530] [ info]  grace          | 5 seconds
[2026/03/31 17:43:56.530] [ info]  daemon         | 0
[2026/03/31 17:43:56.530] [ info] ___________
[2026/03/31 17:43:56.530] [ info]  inputs:
[2026/03/31 17:43:56.530] [ info]      tail
[2026/03/31 17:43:56.530] [ info] ___________
[2026/03/31 17:43:56.530] [ info]  filters:
[2026/03/31 17:43:56.530] [ info] ___________
[2026/03/31 17:43:56.530] [ info]  outputs:
[2026/03/31 17:43:56.530] [ info]      s3.0
[2026/03/31 17:43:56.530] [ info] ___________
[2026/03/31 17:43:56.530] [ info]  collectors:
[2026/03/31 17:43:56.530] [ info] [fluent bit] version=5.0.2, commit=dd56a3d980, pid=75106
[2026/03/31 17:43:56.530] [debug] [engine] coroutine stack size: 36864 bytes (36.0K)
[2026/03/31 17:43:56.530] [ info] [storage] ver=1.4.0, type=memory, sync=normal, checksum=off, max_chunks_up=128
[2026/03/31 17:43:56.530] [ info] [simd    ] NEON
[2026/03/31 17:43:56.530] [ info] [cmetrics] version=2.1.1
[2026/03/31 17:43:56.530] [ info] [ctraces ] version=0.7.1
[2026/03/31 17:43:56.530] [ info] [input:tail:tail.0] initializing
[2026/03/31 17:43:56.530] [ info] [input:tail:tail.0] storage_strategy='memory' (memory only)
[2026/03/31 17:43:56.530] [debug] [tail:tail.0] created event channels: read=21 write=22
[2026/03/31 17:43:56.530] [debug] [input:tail:tail.0] flb_tail_fs_stat_init() initializing stat tail input
[2026/03/31 17:43:56.530] [debug] [input:tail:tail.0] scanning path ./data/*.log
[2026/03/31 17:43:56.530] [debug] [input:tail:tail.0] inode=8337999 with offset=0 appended as ./data/huge.log
[2026/03/31 17:43:56.530] [debug] [input:tail:tail.0] scan_glob add(): ./data/huge.log, inode 8337999
[2026/03/31 17:43:56.530] [debug] [input:tail:tail.0] 1 new files found on path './data/*.log'
[2026/03/31 17:43:56.530] [debug] [s3:s3.0] created event channels: read=28 write=29
<snip>
[2026/03/31 17:43:56.655] [debug] [aws_credentials] Initialized EC2 Provider in standard chain
[2026/03/31 17:43:56.655] [debug] [aws_credentials] Sync called on the EC2 provider
[2026/03/31 17:43:56.655] [debug] [aws_credentials] Init called on the env provider
[2026/03/31 17:43:56.655] [ info] [output:s3:s3.0] Sending locally buffered data from previous executions to S3; buffer=/tmp/fluent-bit/s3/<my-bucket>
[2026/03/31 17:43:56.663] [ info] [output:s3:s3.0] Pre-compression chunk size is 1536052, After compression, chunk is 1536578 bytes
[2026/03/31 17:43:56.694] [debug] [upstream] KA connection #34 to s3.ap-northeast-1.amazonaws.com:443 is connected
[2026/03/31 17:43:56.694] [debug] [http_client] not using http_proxy for header
[2026/03/31 17:43:56.694] [debug] [aws_credentials] Requesting credentials from the env provider..
[2026/03/31 17:43:56.947] [debug] [upstream] KA connection #34 to s3.ap-northeast-1.amazonaws.com:443 is now available
[2026/03/31 17:43:56.948] [debug] [output:s3:s3.0] PutObject http status=200
[2026/03/31 17:43:56.948] [ info] [output:s3:s3.0] Successfully uploaded object /logs/2026-03-31/PsROgIK2.parquet
[2026/03/31 17:43:56.948] [debug] [aws_credentials] upstream_set called on the EC2 provider
[2026/03/31 17:43:56.948] [ info] [output:s3:s3.0] initializing worker
[2026/03/31 17:43:56.948] [ info] [sp] stream processor started
[2026/03/31 17:43:56.949] [ info] [output:s3:s3.0] worker #0 started
[2026/03/31 17:43:56.949] [ info] [engine] Shutdown Grace Period=5, Shutdown Input Grace Period=2
[2026/03/31 17:43:56.952] [debug] [input:tail:tail.0] [static files] processed 1.5M
[2026/03/31 17:43:56.952] [debug] [input:tail:tail.0] inode=8337999 file=./data/huge.log promote to TAIL_EVENT
[2026/03/31 17:43:56.952] [debug] [input:tail:tail.0] [static files] processed 0b, done
[2026/03/31 17:43:57.953] [debug] [task] created task=0xa8d158000 id=0 OK
[2026/03/31 17:43:57.953] [debug] [output:s3:s3.0] task_id=0 assigned to thread #0
[2026/03/31 17:43:57.954] [debug] [output:s3:s3.0] Creating upload timer with frequency 60s
[2026/03/31 17:43:57.965] [debug] [out flush] cb_destroy coro_id=0
[2026/03/31 17:43:57.965] [debug] [task] destroy task=0xa8d158000 (task_id=0)
^C[2026/03/31 17:44:26] [engine] caught signal (SIGINT)
[2026/03/31 17:44:26.727] [ info] [input] pausing tail.0
[2026/03/31 17:44:26.727] [ info] [output:s3:s3.0] thread worker #0 stopping...
[2026/03/31 17:44:26.727] [ info] [output:s3:s3.0] terminating worker
[2026/03/31 17:44:26.727] [ info] [output:s3:s3.0] thread worker #0 stopped
[2026/03/31 17:44:26.727] [ info] [output:s3:s3.0] Sending all locally buffered data to S3
[2026/03/31 17:44:26.739] [ info] [output:s3:s3.0] Pre-compression chunk size is 1536052, After compression, chunk is 1536578 bytes
  • Attached Valgrind output that shows no leaks or memory corruption was found

It's leaks command result on macOS:

Process 76557 is not debuggable. Due to security restrictions, leaks can only show or save contents of readonly memory of restricted processes.

Process:         fluent-bit [76557]
Path:            /Users/USER/*/fluent-bit
Load Address:    0x102b78000
Identifier:      fluent-bit
Version:         0
Code Type:       ARM64
Platform:        macOS
Parent Process:  leaks [76556]
Target Type:     live task

Date/Time:       2026-03-31 17:50:01.884 +0900
Launch Time:     2026-03-31 17:49:46.258 +0900
OS Version:      macOS 26.3.1 (25D2128)
Report Version:  7
Analysis Tool:   /Applications/Xcode.app/Contents/Developer/usr/bin/leaks
Analysis Tool Version:  Xcode 26.4 (17E192)

Physical footprint:         51.6M
Physical footprint (peak):  51.6M
Idle exit:                  untracked
----

leaks Report Version: 4.0, multi-line stacks
Process 76557: 52082 nodes malloced for 12486 KB
Process 76557: 0 leaks for 0 total leaked bytes.

[2026/03/31 17:50:02] [engine] caught signal (SIGCONT)
[2026/03/31 17:50:02] [engine] caught signal (SIGHUP)

If this is a change to packaging of containers or native binaries then please confirm it works for all targets.

  • Run local packaging test showing all targets (including any new ones) build.
  • Set ok-package-test label to test for all targets (requires maintainer to do).

Documentation

  • Documentation required for this feature

Backporting

  • Backport to latest stable release.

Fluent Bit is licensed under Apache 2.0, by submitting this pull request I understand that this code will be released under the terms of that license.

Summary by CodeRabbit

Release Notes

  • Performance Improvements
    • Enhanced JSON-to-Arrow data conversion with adaptive block-size optimization. The parsing engine now dynamically adjusts block sizes within a 8-64 MiB range to improve parsing efficiency based on input size.

Signed-off-by: Hiroshi Hatake <hiroshi@chronosphere.io>
@coderabbitai
Copy link
Copy Markdown

coderabbitai bot commented Mar 31, 2026

No actionable comments were generated in the recent review. 🎉

ℹ️ Recent review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: d441b5f0-9440-4a54-8437-66cbb1a3ffaa

📥 Commits

Reviewing files that changed from the base of the PR and between 680f14c and dd56a3d.

📒 Files selected for processing (1)
  • src/aws/compression/arrow/compress.c

📝 Walkthrough

Walkthrough

Added a choose_block_size() helper function that computes an adaptive block-size value (8 MiB to 64 MiB range) for Apache Arrow's JSON reader configuration. Updated parse_json() to use this function when setting JSON reader options, addressing failures when processing records exceeding 1MB.

Changes

Cohort / File(s) Summary
Arrow JSON Reader Block Size Configuration
src/aws/compression/arrow/compress.c
Added choose_block_size(size_t size) helper function that calculates adaptive block sizes from 8 MiB to 64 MiB based on input size. Modified parse_json() to invoke this helper and configure the Arrow JSON reader's block-size property, replacing default 1MB limitation.

Estimated code review effort

🎯 2 (Simple) | ⏱️ ~10 minutes

Suggested reviewers

  • edsiper
  • fujimotos
  • leonardo-albertovich
  • niedbalski
  • patrick-stephens

Poem

🐰 A block size so tiny caused records to fail,
When large JSON treasures exceeded the pale,
But with adaptive sizing, from eight up to sixty-four,
The Arrow now handles what it couldn't before! ✨

🚥 Pre-merge checks | ✅ 4 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 50.00% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (4 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title accurately describes the main change: setting up a proper block size for Arrow JSON reader in the AWS S3 Parquet compression path.
Linked Issues check ✅ Passed The code change directly addresses the issue by implementing adaptive block size selection (8-64 MiB) to handle JSON records larger than Arrow's default 1MB block size.
Out of Scope Changes check ✅ Passed The changes are focused solely on the block size configuration for JSON parsing and contain no unrelated modifications.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
📝 Generate docstrings
  • Create stacked PR
  • Commit on current branch
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch cosmo0920-handle-block-size-on-parse_json

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@cosmo0920 cosmo0920 marked this pull request as ready for review March 31, 2026 08:52
@cosmo0920 cosmo0920 requested a review from a team as a code owner March 31, 2026 08:52
@edsiper edsiper merged commit f5df0a9 into master Mar 31, 2026
56 checks passed
@edsiper edsiper deleted the cosmo0920-handle-block-size-on-parse_json branch March 31, 2026 12:52
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

S3 Parquet compression fails for records larger than 1MB due to Arrow JSON reader default block size

2 participants