diff --git a/CHANGES.md b/CHANGES.md index 319520f94309..92538b8a307f 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -80,6 +80,7 @@ ## Bugfixes +* Fixed GCS filesystem glob matching to correctly handle `/` in object names and support `**` for recursive matching (Go) ([#38059](https://github.com/apache/beam/issues/38059)). * Fixed BigQueryEnrichmentHandler batch mode dropping earlier requests when multiple requests share the same enrichment key (Python) ([#38035](https://github.com/apache/beam/issues/38035)). ## Security Fixes diff --git a/sdks/go/pkg/beam/io/filesystem/gcs/gcs.go b/sdks/go/pkg/beam/io/filesystem/gcs/gcs.go index 73e686381053..679a6169a3aa 100644 --- a/sdks/go/pkg/beam/io/filesystem/gcs/gcs.go +++ b/sdks/go/pkg/beam/io/filesystem/gcs/gcs.go @@ -21,7 +21,8 @@ import ( "context" "fmt" "io" - "path/filepath" + "regexp" + "strings" "time" "cloud.google.com/go/storage" @@ -38,6 +39,76 @@ const ( projectBillingHook = "beam:go:hook:filesystem:billingproject" ) +// globToRegex translates a glob pattern to a regular expression. +// It differs from filepath.Match in that: +// - / is treated as a regular character (not a separator), since GCS object +// names are flat with / being just another character +// - ** matches any sequence of characters including / (zero or more) +// - **/ matches zero or more path segments (e.g., "" or "dir/" or "dir/subdir/") +// - * matches any sequence of characters except / (zero or more) +// - ? matches any single character except / +// +// This matches the behavior of the Python and Java SDKs. +func globToRegex(pattern string) (*regexp.Regexp, error) { + var result strings.Builder + result.WriteString("^") + + for i := 0; i < len(pattern); i++ { + c := pattern[i] + switch c { + case '*': + // Check for ** (double asterisk) + if i+1 < len(pattern) && pattern[i+1] == '*' { + // Check if followed by / (e.g., "**/" matches zero or more path segments) + if i+2 < len(pattern) && pattern[i+2] == '/' { + // **/ matches "" or "something/" or "a/b/c/" + result.WriteString("(.*/)?") + i += 2 // Skip the second * and the / + } else { + // ** at end or before non-slash matches any characters + result.WriteString(".*") + i++ // Skip the second * + } + } else { + result.WriteString("[^/]*") + } + case '?': + result.WriteString("[^/]") + case '[': + // Character class - find the closing bracket + j := i + 1 + if j < len(pattern) && pattern[j] == '!' { + j++ + } + if j < len(pattern) && pattern[j] == ']' { + j++ + } + for j < len(pattern) && pattern[j] != ']' { + j++ + } + if j >= len(pattern) { + return nil, fmt.Errorf("syntax error: unclosed '[' in pattern %q", pattern) + } else { + // Copy the character class, converting ! to ^ for negation + result.WriteByte('[') + content := pattern[i+1 : j] + if len(content) > 0 && content[0] == '!' { + result.WriteByte('^') + content = content[1:] + } + result.WriteString(content) + result.WriteByte(']') + i = j + } + default: + result.WriteString(regexp.QuoteMeta(string(c))) + } + } + + result.WriteString("$") // match end + return regexp.Compile(result.String()) +} + var billingProject string = "" func init() { @@ -107,6 +178,15 @@ func (f *fs) List(ctx context.Context, glob string) ([]string, error) { return nil, err } + // Compile the glob pattern to a regex. We use a custom glob-to-regex + // translation that treats / as a regular character (not a separator), + // since GCS object names are flat. This also supports ** for recursive + // matching, similar to the Java and Python SDKs. + re, err := globToRegex(object) + if err != nil { + return nil, fmt.Errorf("invalid glob pattern %q: %w", object, err) + } + var candidates []string // We handle globs by list all candidates and matching them here. @@ -125,11 +205,7 @@ func (f *fs) List(ctx context.Context, glob string) ([]string, error) { return nil, err } - match, err := filepath.Match(object, obj.Name) - if err != nil { - return nil, err - } - if match { + if re.MatchString(obj.Name) { candidates = append(candidates, obj.Name) } } diff --git a/sdks/go/pkg/beam/io/filesystem/gcs/gcs_test.go b/sdks/go/pkg/beam/io/filesystem/gcs/gcs_test.go index 66dee6bb23f6..cd6aab2a2364 100644 --- a/sdks/go/pkg/beam/io/filesystem/gcs/gcs_test.go +++ b/sdks/go/pkg/beam/io/filesystem/gcs/gcs_test.go @@ -19,6 +19,7 @@ import ( "context" "io" "sort" + "strings" "testing" "time" @@ -271,6 +272,151 @@ func TestGCS_copy(t *testing.T) { } } +func TestGlobToRegex(t *testing.T) { + tests := []struct { + pattern string + name string + want bool + }{ + // Single * should NOT match / in object names + {"*.txt", "file.txt", true}, + {"*.txt", "dir/file.txt", false}, + {"prefix*", "prefix123", true}, + {"prefix*", "prefix/subdir", false}, + + // ** should match any characters including / + {"**", "file.txt", true}, + {"**", "dir/file.txt", true}, + {"**", "dir/subdir/file.txt", true}, + {"prefix/**", "prefix/file.txt", true}, + {"prefix/**", "prefix/subdir/file.txt", true}, + {"**/file.txt", "file.txt", true}, + {"**/file.txt", "dir/file.txt", true}, + {"**/file.txt", "dir/subdir/file.txt", true}, + + // Mixed patterns + {"dir/*.txt", "dir/file.txt", true}, + {"dir/*.txt", "dir/subdir/file.txt", false}, + {"dir/**/*.txt", "dir/file.txt", true}, + {"dir/**/*.txt", "dir/subdir/file.txt", true}, + {"dir/**/file.txt", "dir/file.txt", true}, + {"dir/**/file.txt", "dir/a/b/c/file.txt", true}, + + // ? should match any single character except / + {"file?.txt", "file1.txt", true}, + {"file?.txt", "file12.txt", false}, + {"file?.txt", "file/.txt", false}, // ? should not cross / + {"dir?file.txt", "dir/file.txt", false}, + + // Character classes + {"file[0-9].txt", "file1.txt", true}, + {"file[0-9].txt", "filea.txt", false}, + {"file[!0-9].txt", "filea.txt", true}, + {"file[!0-9].txt", "file1.txt", false}, + + // Exact match (no wildcards) + {"exact.txt", "exact.txt", true}, + {"exact.txt", "notexact.txt", false}, + + // Regex special characters should be escaped + {"file.txt", "file.txt", true}, + {"file.txt", "fileXtxt", false}, + {"file(1).txt", "file(1).txt", true}, + } + + for _, tt := range tests { + t.Run(tt.pattern+"_"+tt.name, func(t *testing.T) { + re, err := globToRegex(tt.pattern) + if err != nil { + t.Fatalf("globToRegex(%q) error = %v", tt.pattern, err) + } + got := re.MatchString(tt.name) + if got != tt.want { + t.Errorf("globToRegex(%q).MatchString(%q) = %v, want %v", tt.pattern, tt.name, got, tt.want) + } + }) + } +} + +func TestGlobToRegex_errors(t *testing.T) { + tests := []struct { + pattern string + wantErr string + }{ + {"file[abc.txt", "unclosed '['"}, + {"[invalid", "unclosed '['"}, + } + + for _, tt := range tests { + t.Run(tt.pattern, func(t *testing.T) { + _, err := globToRegex(tt.pattern) + if err == nil { + t.Errorf("globToRegex(%q) expected error containing %q, got nil", tt.pattern, tt.wantErr) + } else if !strings.Contains(err.Error(), tt.wantErr) { + t.Errorf("globToRegex(%q) error = %v, want error containing %q", tt.pattern, err, tt.wantErr) + } + }) + } +} + +func TestGCS_listWithSlashesInObjectNames(t *testing.T) { + ctx := context.Background() + bucket := "beamgogcsfilesystemtest" + dirPath := "gs://" + bucket + + // Create server with objects that have / in their names + server := fakestorage.NewServer([]fakestorage.Object{ + {ObjectAttrs: fakestorage.ObjectAttrs{BucketName: bucket, Name: "file.txt"}, Content: []byte("")}, + {ObjectAttrs: fakestorage.ObjectAttrs{BucketName: bucket, Name: "dir/file.txt"}, Content: []byte("")}, + {ObjectAttrs: fakestorage.ObjectAttrs{BucketName: bucket, Name: "dir/subdir/file.txt"}, Content: []byte("")}, + {ObjectAttrs: fakestorage.ObjectAttrs{BucketName: bucket, Name: "other.txt"}, Content: []byte("")}, + }) + t.Cleanup(server.Stop) + c := &fs{client: server.Client()} + + tests := []struct { + glob string + want []string + }{ + // Single * should only match top-level files + {dirPath + "/*.txt", []string{dirPath + "/file.txt", dirPath + "/other.txt"}}, + // ** should match all files recursively + {dirPath + "/**", []string{ + dirPath + "/file.txt", + dirPath + "/dir/file.txt", + dirPath + "/dir/subdir/file.txt", + dirPath + "/other.txt", + }}, + // dir/* should only match immediate children + {dirPath + "/dir/*", []string{dirPath + "/dir/file.txt"}}, + // dir/** should match all descendants + {dirPath + "/dir/**", []string{ + dirPath + "/dir/file.txt", + dirPath + "/dir/subdir/file.txt", + }}, + // Deeply nested ** matching (core scenario from issue #38059) + {dirPath + "/dir/subdir/**", []string{ + dirPath + "/dir/subdir/file.txt", + }}, + } + + for _, tt := range tests { + t.Run(tt.glob, func(t *testing.T) { + got, err := c.List(ctx, tt.glob) + if err != nil { + t.Fatalf("List(%q) error = %v", tt.glob, err) + } + + sort.Strings(got) + sort.Strings(tt.want) + + if !cmp.Equal(got, tt.want) { + t.Errorf("List(%q) = %v, want %v", tt.glob, got, tt.want) + } + }) + } +} + func createFakeGCSServer(tb testing.TB) *fakestorage.Server { tb.Helper()