Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
17 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 66 additions & 0 deletions .github/workflows/rust.yml
Original file line number Diff line number Diff line change
Expand Up @@ -107,3 +107,69 @@ jobs:
env:
RUSTDOCFLAGS: "-D warnings"
run: cargo doc --no-deps --workspace

# S3 integration tests using MinIO (only when S3-related files change)
test-s3-integration:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v6
with:
fetch-depth: 0

- name: Check for S3-related changes
id: changes
run: |
BASE_SHA=${{ github.event.pull_request.base.sha || github.event.before || 'HEAD~1' }}
if git diff --name-only "$BASE_SHA" HEAD | grep -qE \
'^spatialbench-cli/src/(s3_writer|runner|output_plan|main)\.rs$|^spatialbench-cli/tests/s3_integration\.rs$|^\.github/workflows/rust\.yml$'; then
echo "s3=true" >> "$GITHUB_OUTPUT"
else
echo "s3=false" >> "$GITHUB_OUTPUT"
fi

- name: Start MinIO
if: steps.changes.outputs.s3 == 'true'
run: |
docker run -d --name minio \
-p 9000:9000 \
-e MINIO_ROOT_USER=minioadmin \
-e MINIO_ROOT_PASSWORD=minioadmin \
minio/minio:latest server /data
# Wait for MinIO to be ready
for i in $(seq 1 30); do
if curl -sf http://localhost:9000/minio/health/live; then
echo "MinIO is ready"
exit 0
fi
sleep 1
done
echo "MinIO failed to start"
docker logs minio
exit 1

- name: Create MinIO test bucket
if: steps.changes.outputs.s3 == 'true'
run: |
curl -sL https://dl.min.io/client/mc/release/linux-amd64/mc -o /usr/local/bin/mc
chmod +x /usr/local/bin/mc
mc alias set local http://localhost:9000 minioadmin minioadmin
mc mb local/spatialbench-test

- uses: dtolnay/rust-toolchain@stable
if: steps.changes.outputs.s3 == 'true'

- uses: Swatinem/rust-cache@v2
if: steps.changes.outputs.s3 == 'true'
with:
prefix-key: "rust-test-s3-v1"

- name: Run S3 integration tests
if: steps.changes.outputs.s3 == 'true'
env:
AWS_ACCESS_KEY_ID: minioadmin
AWS_SECRET_ACCESS_KEY: minioadmin
AWS_ENDPOINT: http://localhost:9000
AWS_REGION: us-east-1
AWS_ALLOW_HTTP: "true"
S3_TEST_BUCKET: spatialbench-test
run: cargo test -p spatialbench-cli --test s3_integration -- --ignored
20 changes: 20 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,26 @@ spatialbench-cli --scale-factor 1 --mb-per-file 256 --output-dir sf1-parquet
spatialbench-cli --scale-factor 10 --mb-per-file 256 --output-dir sf10-parquet
```

#### Generate Data Directly to S3

You can generate data directly to Amazon S3 or S3-compatible storage by providing an S3 URI as the output directory:

```bash
# Set AWS credentials
export AWS_ACCESS_KEY_ID="your-access-key"
export AWS_SECRET_ACCESS_KEY="your-secret-key"
export AWS_REGION="us-west-2" # Must match your bucket's region

# Generate to S3
spatialbench-cli --scale-factor 10 --mb-per-file 256 --output-dir s3://my-bucket/spatialbench/sf10

# For S3-compatible services (MinIO, etc.)
export AWS_ENDPOINT="http://localhost:9000"
spatialbench-cli --scale-factor 1 --output-dir s3://my-bucket/data
```

The S3 writer uses streaming multipart upload, buffering data in 32MB chunks before uploading parts. This ensures memory-efficient generation even for large datasets. All output formats (Parquet, CSV, TBL) are supported, and the generated files are byte-for-byte identical to local generation.

#### Custom Spider Configuration

You can override these defaults at runtime by passing a YAML file via the `--config` flag:
Expand Down
8 changes: 8 additions & 0 deletions docs/datasets-generators.md
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,14 @@ You can generate the tables for Scale Factor 1 with the following command:
spatialbench-cli -s 1 --format=parquet --output-dir sf1-parquet
```

You can also generate data directly to Amazon S3 by providing an S3 URI:

```
spatialbench-cli -s 1 --format=parquet --output-dir s3://my-bucket/sf1-parquet
```

See the [Quickstart](quickstart.md#generate-data-directly-to-s3) for details on configuring AWS credentials.

Here are the contents of the `sf1-parquet` directory:

* `building.parquet`
Expand Down
20 changes: 20 additions & 0 deletions docs/quickstart.md
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,26 @@ spatialbench-cli --scale-factor 10 --mb-per-file 512
spatialbench-cli --scale-factor 1 --output-dir data/sf1
```

### Generate Data Directly to S3

You can generate data directly to Amazon S3 or S3-compatible storage by providing an S3 URI as the output directory:

```shell
# Set AWS credentials
export AWS_ACCESS_KEY_ID="your-access-key"
export AWS_SECRET_ACCESS_KEY="your-secret-key"
export AWS_REGION="us-west-2" # Must match your bucket's region

# Generate to S3
spatialbench-cli --scale-factor 10 --mb-per-file 256 --output-dir s3://my-bucket/spatialbench/sf10

# For S3-compatible services (MinIO, etc.)
export AWS_ENDPOINT="http://localhost:9000"
spatialbench-cli --scale-factor 1 --output-dir s3://my-bucket/data
```

The S3 writer uses streaming multipart upload, buffering data in 32 MB chunks before uploading parts. All standard AWS environment variables are supported, including `AWS_SESSION_TOKEN` for temporary credentials.

## Configuring Spatial Distributions

SpatialBench uses a spatial data generator to generate synthetic points and polygons using realistic spatial distributions.
Expand Down
3 changes: 2 additions & 1 deletion spatialbench-cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,11 @@ serde = { version = "1.0.219", features = ["derive"] }
anyhow = "1.0.99"
serde_yaml = "0.9.33"
datafusion = "50.2"
object_store = { version = "0.12.4", features = ["http"] }
object_store = { version = "0.12.4", features = ["http", "aws"] }
arrow-array = "56"
arrow-schema = "56"
url = "2.5.7"
bytes = "1.10.1"

[dev-dependencies]
assert_cmd = "2.0"
Expand Down
14 changes: 8 additions & 6 deletions spatialbench-cli/src/generate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,13 @@ pub trait Source: Send {
/// Something that can write the contents of a buffer somewhere
///
/// For example, this is implemented for a file writer.
pub trait Sink: Send {
pub trait Sink: Send + Sized {
/// Write all data from the buffer to the sink
fn sink(&mut self, buffer: &[u8]) -> Result<(), io::Error>;

/// Complete and flush any remaining data from the sink
fn flush(self) -> Result<(), io::Error>;
/// Complete and flush any remaining data from the sink, returning it
/// so the caller can perform additional finalization (e.g. async S3 upload).
fn flush(self) -> Result<Self, io::Error>;
}

/// Generates data in parallel from a series of [`Source`] and writes to a [`Sink`]
Expand All @@ -69,7 +70,7 @@ pub async fn generate_in_chunks<G, I, S>(
mut sink: S,
sources: I,
num_threads: usize,
) -> Result<(), io::Error>
) -> Result<S, io::Error>
where
G: Source + 'static,
I: Iterator<Item = G>,
Expand All @@ -86,7 +87,7 @@ where

// write the header
let Some(first) = sources.peek() else {
return Ok(()); // no sources
return Ok(sink); // no sources
};
let header = first.header(Vec::new());
tx.send(header)
Expand Down Expand Up @@ -131,7 +132,8 @@ where
sink.sink(&buffer)?;
captured_recycler.return_buffer(buffer);
}
// No more input, flush the sink and return
// No more input, flush the sink and return it so the caller can
// perform additional finalization (e.g. async S3 upload).
sink.flush()
});

Expand Down
31 changes: 22 additions & 9 deletions spatialbench-cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ mod output_plan;
mod parquet;
mod plan;
mod runner;
mod s3_writer;
mod spatial_config_file;
mod statistics;
mod tbl;
Expand Down Expand Up @@ -252,8 +253,9 @@ impl Cli {
debug!("Logging configured from environment variables");
}

// Create output directory if it doesn't exist and we are not writing to stdout.
if !self.stdout {
// Create output directory if it doesn't exist and we are not writing to stdout
// or to S3 (where local directories are meaningless).
if !self.stdout && !self.output_dir.to_string_lossy().starts_with("s3://") {
fs::create_dir_all(&self.output_dir)?;
}

Expand Down Expand Up @@ -386,21 +388,26 @@ impl Cli {
}
}

impl IntoSize for BufWriter<Stdout> {
fn into_size(self) -> Result<usize, io::Error> {
// we can't get the size of stdout, so just return 0
impl AsyncFinalize for BufWriter<Stdout> {
async fn finalize(self) -> Result<usize, io::Error> {
Ok(0)
}
}

impl IntoSize for BufWriter<File> {
fn into_size(self) -> Result<usize, io::Error> {
impl AsyncFinalize for BufWriter<File> {
async fn finalize(self) -> Result<usize, io::Error> {
let file = self.into_inner()?;
let metadata = file.metadata()?;
Ok(metadata.len() as usize)
}
}

impl AsyncFinalize for s3_writer::S3Writer {
async fn finalize(self) -> Result<usize, io::Error> {
self.finish().await
}
}

/// Wrapper around a buffer writer that counts the number of buffers and bytes written
struct WriterSink<W: Write> {
statistics: WriteStatistics,
Expand All @@ -414,6 +421,11 @@ impl<W: Write> WriterSink<W> {
statistics: WriteStatistics::new("buffers"),
}
}

/// Consume the sink and return the inner writer for further finalization.
fn into_inner(self) -> W {
self.inner
}
}

impl<W: Write + Send> Sink for WriterSink<W> {
Expand All @@ -423,7 +435,8 @@ impl<W: Write + Send> Sink for WriterSink<W> {
self.inner.write_all(buffer)
}

fn flush(mut self) -> Result<(), io::Error> {
self.inner.flush()
fn flush(mut self) -> Result<Self, io::Error> {
self.inner.flush()?;
Ok(self)
}
}
69 changes: 58 additions & 11 deletions spatialbench-cli/src/output_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,21 +20,33 @@
//! * [`OutputPlanGenerator`]: plans the output files to be generated

use crate::plan::GenerationPlan;
use crate::s3_writer::{build_s3_client, parse_s3_uri};
use crate::{OutputFormat, Table};
use log::debug;
use object_store::ObjectStore;
use parquet::basic::Compression;
use std::collections::HashSet;
use std::fmt::{Display, Formatter};
use std::io;
use std::path::PathBuf;
use std::sync::Arc;

/// Where a partition will be output
#[derive(Debug, Clone, PartialEq)]
#[derive(Debug, Clone)]
pub enum OutputLocation {
/// Output to a file
File(PathBuf),
/// Output to stdout
Stdout,
/// Output to S3 with a shared client
S3 {
/// The full S3 URI for this object (e.g. `s3://bucket/path/to/file.parquet`)
uri: String,
/// The object path within the bucket (e.g. `path/to/file.parquet`)
path: String,
/// Shared S3 client for the bucket
client: Arc<dyn ObjectStore>,
},
}

impl Display for OutputLocation {
Expand All @@ -48,12 +60,13 @@ impl Display for OutputLocation {
write!(f, "{}", file.to_string_lossy())
}
OutputLocation::Stdout => write!(f, "Stdout"),
OutputLocation::S3 { uri, .. } => write!(f, "{}", uri),
}
}
}

/// Describes an output partition (file) that will be generated
#[derive(Debug, Clone, PartialEq)]
#[derive(Debug, Clone)]
pub struct OutputPlan {
/// The table
table: Table,
Expand Down Expand Up @@ -151,6 +164,8 @@ pub struct OutputPlanGenerator {
/// Output directories that have been created so far
/// (used to avoid creating the same directory multiple times)
created_directories: HashSet<PathBuf>,
/// Shared S3 client, lazily created on first S3 output location
s3_client: Option<Arc<dyn ObjectStore>>,
}

impl OutputPlanGenerator {
Expand All @@ -171,6 +186,7 @@ impl OutputPlanGenerator {
output_dir,
output_plans: Vec::new(),
created_directories: HashSet::new(),
s3_client: None,
}
}

Expand Down Expand Up @@ -282,17 +298,48 @@ impl OutputPlanGenerator {
OutputFormat::Parquet => "parquet",
};

let mut output_path = self.output_dir.clone();
if let Some(part) = part {
// If a partition is specified, create a subdirectory for it
output_path.push(table.to_string());
self.ensure_directory_exists(&output_path)?;
output_path.push(format!("{table}.{part}.{extension}"));
// Check if output_dir is an S3 URI
let output_dir_str = self.output_dir.to_string_lossy();
if output_dir_str.starts_with("s3://") {
// Handle S3 path
let base_uri = output_dir_str.trim_end_matches('/');
let s3_uri = if let Some(part) = part {
format!("{base_uri}/{table}/{table}.{part}.{extension}")
} else {
format!("{base_uri}/{table}.{extension}")
};

// Lazily build the S3 client on first use, then reuse it
let client = if let Some(ref client) = self.s3_client {
Arc::clone(client)
} else {
let (bucket, _) = parse_s3_uri(&s3_uri)?;
let client = build_s3_client(&bucket)?;
self.s3_client = Some(Arc::clone(&client));
client
};

let (_, path) = parse_s3_uri(&s3_uri)?;

Ok(OutputLocation::S3 {
uri: s3_uri,
path,
client,
})
} else {
// No partition specified, output to a single file
output_path.push(format!("{table}.{extension}"));
// Handle local filesystem path
let mut output_path = self.output_dir.clone();
if let Some(part) = part {
// If a partition is specified, create a subdirectory for it
output_path.push(table.to_string());
self.ensure_directory_exists(&output_path)?;
output_path.push(format!("{table}.{part}.{extension}"));
} else {
// No partition specified, output to a single file
output_path.push(format!("{table}.{extension}"));
}
Ok(OutputLocation::File(output_path))
}
Ok(OutputLocation::File(output_path))
}
}

Expand Down
Loading