Skip to content

Commit e6ee293

Browse files
Add table operations test (seaweedfs#8241)
* Add Trino blog operations test * Update test/s3tables/catalog_trino/trino_blog_operations_test.go Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com> * feat: add table bucket path helpers and filer operations - Add table object root and table location mapping directories - Implement ensureDirectory, upsertFile, deleteEntryIfExists helpers - Support table location bucket mapping for S3 access * feat: manage table bucket object roots on creation/deletion - Create .objects directory for table buckets on creation - Clean up table object bucket paths on deletion - Enable S3 operations on table bucket object roots * feat: add table location mapping for Iceberg REST - Track table location bucket mappings when tables are created/updated/deleted - Enable location-based routing for S3 operations on table data * feat: route S3 operations to table bucket object roots - Route table-s3 bucket names to mapped table paths - Route table buckets to object root directories - Support table location bucket mapping lookup * feat: emit table-s3 locations from Iceberg REST - Generate unique table-s3 bucket names with UUID suffix - Store table metadata under table bucket paths - Return table-s3 locations for Trino compatibility * fix: handle missing directories in S3 list operations - Propagate ErrNotFound from ListEntries for non-existent directories - Treat missing directories as empty results for list operations - Fixes Trino non-empty location checks on table creation * test: improve Trino CSV parsing for single-value results - Sanitize Trino output to skip jline warnings - Handle single-value CSV results without header rows - Strip quotes from numeric values in tests * refactor: use bucket path helpers throughout S3 API - Replace direct bucket path operations with helper functions - Leverage centralized table bucket routing logic - Improve maintainability with consistent path resolution * fix: add table bucket cache and improve filer error handling - Cache table bucket lookups to reduce filer overhead on repeated checks - Use filer_pb.CreateEntry and filer_pb.UpdateEntry helpers to check resp.Error - Fix delete order in handler_bucket_get_list_delete: delete table object before directory - Make location mapping errors best-effort: log and continue, don't fail API - Update table location mappings to delete stale prior bucket mappings on update - Add 1-second sleep before timestamp time travel query to ensure timestamps are in past - Fix CSV parsing: examine all lines, not skip first; handle single-value rows * fix: properly handle stale metadata location mapping cleanup - Capture oldMetadataLocation before mutation in handleUpdateTable - Update updateTableLocationMapping to accept both old and new locations - Use passed-in oldMetadataLocation to detect location changes - Delete stale mapping only when location actually changes - Pass empty string for oldLocation in handleCreateTable (new tables have no prior mapping) - Improve logging to show old -> new location transitions * refactor: cleanup imports and cache design - Remove unused 'sync' import from bucket_paths.go - Use filer_pb.UpdateEntry helper in setExtendedAttribute and deleteExtendedAttribute for consistent error handling - Add dedicated tableBucketCache map[string]bool to BucketRegistry instead of mixing concerns with metadataCache - Improve cache separation: table buckets cache is now separate from bucket metadata cache * fix: improve cache invalidation and add transient error handling Cache invalidation (critical fix): - Add tableLocationCache to BucketRegistry for location mapping lookups - Clear tableBucketCache and tableLocationCache in RemoveBucketMetadata - Prevents stale cache entries when buckets are deleted/recreated Transient error handling: - Only cache table bucket lookups when conclusive (found or ErrNotFound) - Skip caching on transient errors (network, permission, etc) - Prevents marking real table buckets as non-table due to transient failures Performance optimization: - Cache tableLocationDir results to avoid repeated filer RPCs on hot paths - tableLocationDir now checks cache before making expensive filer lookups - Cache stores empty string for 'not found' to avoid redundant lookups Code clarity: - Add comment to deleteDirectory explaining DeleteEntry response lacks Error field * go fmt * fix: mirror transient error handling in tableLocationDir and optimize bucketDir Transient error handling: - tableLocationDir now only caches definitive results - Mirrors isTableBucket behavior to prevent treating transient errors as permanent misses - Improves reliability on flaky systems or during recovery Performance optimization: - bucketDir avoids redundant isTableBucket call via bucketRoot - Directly use s3a.option.BucketsPath for regular buckets - Saves one cache lookup for every non-table bucket operation * fix: revert bucketDir optimization to preserve bucketRoot logic The optimization to directly use BucketsPath bypassed bucketRoot's logic and caused issues with S3 list operations on delimiter+prefix cases. Revert to using path.Join(s3a.bucketRoot(bucket), bucket) which properly handles all bucket types and ensures consistent path resolution across the codebase. The slight performance cost of an extra cache lookup is worth the correctness and consistency benefits. * feat: move table buckets under /buckets Add a table-bucket marker attribute, reuse bucket metadata cache for table bucket detection, and update list/validation/UI/test paths to treat table buckets as /buckets entries. * Fix S3 Tables code review issues - handler_bucket_create.go: Fix bucket existence check to properly validate entryResp.Entry before setting s3BucketExists flag (nil Entry should not indicate existing bucket) - bucket_paths.go: Add clarifying comment to bucketRoot() explaining unified buckets root path for all bucket types - file_browser_data.go: Optimize by extracting table bucket check early to avoid redundant WithFilerClient call * Fix list prefix delimiter handling * Handle list errors conservatively * Fix Trino FOR TIMESTAMP query - use past timestamp Iceberg requires the timestamp to be strictly in the past. Use current_timestamp - interval '1' second instead of current_timestamp. --------- Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>
1 parent c284e51 commit e6ee293

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

41 files changed

+906
-278
lines changed
Lines changed: 172 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,172 @@
1+
package catalog_trino
2+
3+
import (
4+
"fmt"
5+
"strconv"
6+
"strings"
7+
"testing"
8+
"time"
9+
)
10+
11+
func TestTrinoBlogOperations(t *testing.T) {
12+
env := setupTrinoTest(t)
13+
defer env.Cleanup(t)
14+
15+
schemaName := "blog_ns_" + randomString(6)
16+
customersTable := "customers_" + randomString(6)
17+
trinoCustomersTable := "trino_customers_" + randomString(6)
18+
19+
runTrinoSQL(t, env.trinoContainer, fmt.Sprintf("CREATE SCHEMA IF NOT EXISTS iceberg.%s", schemaName))
20+
defer runTrinoSQL(t, env.trinoContainer, fmt.Sprintf("DROP SCHEMA IF EXISTS iceberg.%s", schemaName))
21+
defer runTrinoSQL(t, env.trinoContainer, fmt.Sprintf("DROP TABLE IF EXISTS iceberg.%s.%s", schemaName, trinoCustomersTable))
22+
defer runTrinoSQL(t, env.trinoContainer, fmt.Sprintf("DROP TABLE IF EXISTS iceberg.%s.%s", schemaName, customersTable))
23+
24+
createCustomersSQL := fmt.Sprintf(`CREATE TABLE IF NOT EXISTS iceberg.%s.%s (
25+
customer_sk INT,
26+
customer_id VARCHAR,
27+
salutation VARCHAR,
28+
first_name VARCHAR,
29+
last_name VARCHAR,
30+
preferred_cust_flag VARCHAR,
31+
birth_day INT,
32+
birth_month INT,
33+
birth_year INT,
34+
birth_country VARCHAR,
35+
login VARCHAR
36+
) WITH (
37+
format = 'PARQUET',
38+
sorted_by = ARRAY['customer_id']
39+
)`, schemaName, customersTable)
40+
runTrinoSQL(t, env.trinoContainer, createCustomersSQL)
41+
42+
insertCustomersSQL := fmt.Sprintf(`INSERT INTO iceberg.%s.%s VALUES
43+
(1, 'AAAAA', 'Mrs', 'Amanda', 'Olson', 'Y', 8, 4, 1984, 'US', 'aolson'),
44+
(2, 'AAAAB', 'Mr', 'Leonard', 'Eads', 'N', 22, 6, 2001, 'US', 'leads'),
45+
(3, 'BAAAA', 'Mr', 'David', 'White', 'Y', 16, 2, 1999, 'US', 'dwhite'),
46+
(4, 'BBAAA', 'Mr', 'Melvin', 'Lee', 'N', 30, 3, 1973, 'US', 'mlee'),
47+
(5, 'AACAA', 'Mr', 'Donald', 'Holt', 'N', 2, 6, 1982, 'CA', 'dholt'),
48+
(6, 'ABAAA', 'Mrs', 'Jacqueline', 'Harvey', 'N', 5, 12, 1988, 'US', 'jharvey'),
49+
(7, 'BBAAA', 'Ms', 'Debbie', 'Ward', 'N', 6, 1, 2006, 'MX', 'dward'),
50+
(8, 'ACAAA', 'Mr', 'Tim', 'Strong', 'N', 15, 7, 1976, 'US', 'tstrong')
51+
`, schemaName, customersTable)
52+
runTrinoSQL(t, env.trinoContainer, insertCustomersSQL)
53+
54+
countOutput := runTrinoSQL(t, env.trinoContainer, fmt.Sprintf("SELECT count(*) FROM iceberg.%s.%s", schemaName, customersTable))
55+
rowCount := mustParseCSVInt64(t, countOutput)
56+
if rowCount != 8 {
57+
t.Fatalf("expected 8 rows in customers table, got %d", rowCount)
58+
}
59+
60+
output := runTrinoSQL(t, env.trinoContainer, fmt.Sprintf("SELECT first_name FROM iceberg.%s.%s WHERE customer_sk = 1", schemaName, customersTable))
61+
if !strings.Contains(output, "Amanda") {
62+
t.Fatalf("expected sample query to include Amanda, got: %s", output)
63+
}
64+
65+
ctasSQL := fmt.Sprintf(`CREATE TABLE iceberg.%s.%s
66+
WITH (
67+
format = 'PARQUET'
68+
)
69+
AS SELECT * FROM iceberg.%s.%s`, schemaName, trinoCustomersTable, schemaName, customersTable)
70+
runTrinoSQL(t, env.trinoContainer, ctasSQL)
71+
72+
countOutput = runTrinoSQL(t, env.trinoContainer, fmt.Sprintf("SELECT count(*) FROM iceberg.%s.%s", schemaName, trinoCustomersTable))
73+
rowCount = mustParseCSVInt64(t, countOutput)
74+
if rowCount != 8 {
75+
t.Fatalf("expected 8 rows in CTAS table, got %d", rowCount)
76+
}
77+
78+
runTrinoSQL(t, env.trinoContainer, fmt.Sprintf("ALTER TABLE iceberg.%s.%s ADD COLUMN updated_at TIMESTAMP", schemaName, trinoCustomersTable))
79+
output = runTrinoSQL(t, env.trinoContainer, fmt.Sprintf("DESCRIBE iceberg.%s.%s", schemaName, trinoCustomersTable))
80+
if !strings.Contains(output, "updated_at") {
81+
t.Fatalf("expected updated_at column in describe output, got: %s", output)
82+
}
83+
84+
runTrinoSQL(t, env.trinoContainer, fmt.Sprintf("UPDATE iceberg.%s.%s SET updated_at = current_timestamp", schemaName, trinoCustomersTable))
85+
86+
// Sleep to ensure timestamps are in the past for time travel queries
87+
time.Sleep(1 * time.Second)
88+
89+
snapshotOutput := runTrinoSQL(t, env.trinoContainer, fmt.Sprintf(`SELECT snapshot_id FROM iceberg.%s."%s$snapshots" ORDER BY committed_at DESC LIMIT 1`, schemaName, trinoCustomersTable))
90+
snapshotID := mustParseCSVInt64(t, snapshotOutput)
91+
if snapshotID == 0 {
92+
t.Fatalf("expected snapshot ID from snapshots table, got 0")
93+
}
94+
95+
filesOutput := runTrinoSQL(t, env.trinoContainer, fmt.Sprintf(`SELECT file_path FROM iceberg.%s."%s$files" LIMIT 1`, schemaName, trinoCustomersTable))
96+
if !hasCSVDataRow(filesOutput) {
97+
t.Fatalf("expected files metadata rows, got: %s", filesOutput)
98+
}
99+
100+
historyOutput := runTrinoSQL(t, env.trinoContainer, fmt.Sprintf(`SELECT made_current_at FROM iceberg.%s."%s$history" LIMIT 1`, schemaName, trinoCustomersTable))
101+
if !hasCSVDataRow(historyOutput) {
102+
t.Fatalf("expected history metadata rows, got: %s", historyOutput)
103+
}
104+
105+
countOutput = runTrinoSQL(t, env.trinoContainer, fmt.Sprintf("SELECT count(*) FROM iceberg.%s.%s FOR VERSION AS OF %d", schemaName, trinoCustomersTable, snapshotID))
106+
versionCount := mustParseCSVInt64(t, countOutput)
107+
if versionCount != 8 {
108+
t.Fatalf("expected 8 rows for version time travel, got %d", versionCount)
109+
}
110+
111+
// Use current_timestamp - interval '1 second' to ensure it's in the past (Iceberg requirement)
112+
countOutput = runTrinoSQL(t, env.trinoContainer, fmt.Sprintf("SELECT count(*) FROM iceberg.%s.%s FOR TIMESTAMP AS OF (current_timestamp - interval '1' second)", schemaName, trinoCustomersTable))
113+
timestampCount := mustParseCSVInt64(t, countOutput)
114+
if timestampCount != 8 {
115+
t.Fatalf("expected 8 rows for timestamp time travel, got %d", timestampCount)
116+
}
117+
118+
runTrinoSQL(t, env.trinoContainer, fmt.Sprintf("DELETE FROM iceberg.%s.%s WHERE customer_sk = 8", schemaName, trinoCustomersTable))
119+
countOutput = runTrinoSQL(t, env.trinoContainer, fmt.Sprintf("SELECT count(*) FROM iceberg.%s.%s", schemaName, trinoCustomersTable))
120+
rowCount = mustParseCSVInt64(t, countOutput)
121+
if rowCount != 7 {
122+
t.Fatalf("expected 7 rows after delete, got %d", rowCount)
123+
}
124+
125+
runTrinoSQL(t, env.trinoContainer, fmt.Sprintf("ALTER TABLE iceberg.%s.%s EXECUTE rollback_to_snapshot(%d)", schemaName, trinoCustomersTable, snapshotID))
126+
countOutput = runTrinoSQL(t, env.trinoContainer, fmt.Sprintf("SELECT count(*) FROM iceberg.%s.%s", schemaName, trinoCustomersTable))
127+
rowCount = mustParseCSVInt64(t, countOutput)
128+
if rowCount != 8 {
129+
t.Fatalf("expected 8 rows after rollback, got %d", rowCount)
130+
}
131+
}
132+
133+
func hasCSVDataRow(output string) bool {
134+
lines := strings.Split(strings.TrimSpace(output), "\n")
135+
if len(lines) == 0 {
136+
return false
137+
}
138+
for _, line := range lines {
139+
if strings.TrimSpace(line) != "" {
140+
return true
141+
}
142+
}
143+
return false
144+
}
145+
146+
func mustParseCSVInt64(t *testing.T, output string) int64 {
147+
t.Helper()
148+
value := mustFirstCSVValue(t, output)
149+
parsed, err := strconv.ParseInt(value, 10, 64)
150+
if err != nil {
151+
t.Fatalf("failed to parse int from output %q: %v", output, err)
152+
}
153+
return parsed
154+
}
155+
156+
func mustFirstCSVValue(t *testing.T, output string) string {
157+
t.Helper()
158+
lines := strings.Split(strings.TrimSpace(output), "\n")
159+
if len(lines) == 0 {
160+
t.Fatalf("expected CSV output with data row, got: %q", output)
161+
}
162+
for _, line := range lines {
163+
line = strings.TrimSpace(line)
164+
if line == "" {
165+
continue
166+
}
167+
parts := strings.Split(line, ",")
168+
return strings.Trim(parts[0], "\"")
169+
}
170+
t.Fatalf("no CSV data rows found in output: %q", output)
171+
return ""
172+
}

test/s3tables/catalog_trino/trino_catalog_test.go

Lines changed: 27 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -319,8 +319,9 @@ func (env *TestEnvironment) writeTrinoConfig(t *testing.T, warehouseBucket strin
319319
config := fmt.Sprintf(`connector.name=iceberg
320320
iceberg.catalog.type=rest
321321
iceberg.rest-catalog.uri=http://host.docker.internal:%d
322-
iceberg.rest-catalog.warehouse=s3://%s/
322+
iceberg.rest-catalog.warehouse=s3tablescatalog/%s
323323
iceberg.file-format=PARQUET
324+
iceberg.unique-table-location=true
324325
325326
# S3 storage config
326327
fs.native-s3.enabled=true
@@ -415,7 +416,7 @@ func runTrinoSQL(t *testing.T, containerName, sql string) string {
415416
logs, _ := exec.Command("docker", "logs", containerName).CombinedOutput()
416417
t.Fatalf("Trino command failed: %v\nSQL: %s\nOutput:\n%s\nTrino logs:\n%s", err, sql, string(output), string(logs))
417418
}
418-
return string(output)
419+
return sanitizeTrinoOutput(string(output))
419420
}
420421

421422
func createTableBucket(t *testing.T, env *TestEnvironment, bucketName string) {
@@ -439,6 +440,30 @@ func createTableBucket(t *testing.T, env *TestEnvironment, bucketName string) {
439440
t.Logf("Created table bucket: %s", bucketName)
440441
}
441442

443+
func sanitizeTrinoOutput(output string) string {
444+
lines := strings.Split(strings.TrimSpace(output), "\n")
445+
filtered := make([]string, 0, len(lines))
446+
for _, line := range lines {
447+
if strings.Contains(line, "org.jline.utils.Log") {
448+
continue
449+
}
450+
if strings.Contains(line, "Unable to create a system terminal") {
451+
continue
452+
}
453+
if strings.HasPrefix(line, "WARNING:") {
454+
continue
455+
}
456+
if strings.TrimSpace(line) == "" {
457+
continue
458+
}
459+
filtered = append(filtered, line)
460+
}
461+
if len(filtered) == 0 {
462+
return ""
463+
}
464+
return strings.Join(filtered, "\n") + "\n"
465+
}
466+
442467
func createObjectBucket(t *testing.T, env *TestEnvironment, bucketName string) {
443468
t.Helper()
444469

weed/admin/dash/file_browser_data.go

Lines changed: 22 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,9 @@ import (
66
"strings"
77
"time"
88

9+
"github.com/seaweedfs/seaweedfs/weed/glog"
910
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
11+
"github.com/seaweedfs/seaweedfs/weed/s3api/s3tables"
1012
)
1113

1214
// FileEntry represents a file or directory entry in the file browser
@@ -218,25 +220,33 @@ func (s *AdminServer) GetFileBrowser(dir string, lastFileName string, pageSize i
218220
}
219221
}
220222

221-
// Check if this is a bucket path
223+
// Check if this is a bucket path and determine if it's a table bucket
222224
isBucketPath := false
223225
bucketName := ""
226+
isTableBucketPath := false
227+
tableBucketName := ""
224228
if strings.HasPrefix(dir, "/buckets/") {
225229
isBucketPath = true
226230
pathParts := strings.Split(strings.Trim(dir, "/"), "/")
227231
if len(pathParts) >= 2 {
228232
bucketName = pathParts[1]
229-
}
230-
}
231-
232-
// Check if this is a table bucket path
233-
isTableBucketPath := false
234-
tableBucketName := ""
235-
if strings.HasPrefix(dir, "/table-buckets/") {
236-
isTableBucketPath = true
237-
pathParts := strings.Split(strings.Trim(dir, "/"), "/")
238-
if len(pathParts) >= 2 {
239-
tableBucketName = pathParts[1]
233+
// Check table bucket status early to avoid second WithFilerClient call
234+
if err := s.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
235+
resp, err := filer_pb.LookupEntry(context.Background(), client, &filer_pb.LookupDirectoryEntryRequest{
236+
Directory: "/buckets",
237+
Name: bucketName,
238+
})
239+
if err != nil {
240+
return err
241+
}
242+
if s3tables.IsTableBucketEntry(resp.Entry) {
243+
isTableBucketPath = true
244+
tableBucketName = bucketName
245+
}
246+
return nil
247+
}); err != nil {
248+
glog.V(1).Infof("file browser table bucket lookup failed for %s: %v", bucketName, err)
249+
}
240250
}
241251
}
242252

@@ -287,12 +297,8 @@ func (s *AdminServer) generateBreadcrumbs(dir string) []BreadcrumbItem {
287297
displayName := part
288298
if len(breadcrumbs) == 1 && part == "buckets" {
289299
displayName = "Object Store Buckets"
290-
} else if len(breadcrumbs) == 1 && part == "table-buckets" {
291-
displayName = "Table Buckets"
292300
} else if len(breadcrumbs) == 2 && strings.HasPrefix(dir, "/buckets/") {
293301
displayName = "📦 " + part // Add bucket icon to bucket name
294-
} else if len(breadcrumbs) == 2 && strings.HasPrefix(dir, "/table-buckets/") {
295-
displayName = "🧊 " + part
296302
}
297303

298304
breadcrumbs = append(breadcrumbs, BreadcrumbItem{

weed/admin/dash/file_browser_data_test.go

Lines changed: 0 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -51,15 +51,6 @@ func TestGenerateBreadcrumbs(t *testing.T) {
5151
{Name: "📦 mybucket", Path: "/buckets/mybucket"},
5252
},
5353
},
54-
{
55-
name: "table bucket path",
56-
path: "/table-buckets/mytablebucket",
57-
expected: []BreadcrumbItem{
58-
{Name: "Root", Path: "/"},
59-
{Name: "Table Buckets", Path: "/table-buckets"},
60-
{Name: "🧊 mytablebucket", Path: "/table-buckets/mytablebucket"},
61-
},
62-
},
6354
{
6455
name: "bucket nested path",
6556
path: "/buckets/mybucket/folder",
@@ -70,16 +61,6 @@ func TestGenerateBreadcrumbs(t *testing.T) {
7061
{Name: "folder", Path: "/buckets/mybucket/folder"},
7162
},
7263
},
73-
{
74-
name: "table bucket nested path",
75-
path: "/table-buckets/mytablebucket/folder",
76-
expected: []BreadcrumbItem{
77-
{Name: "Root", Path: "/"},
78-
{Name: "Table Buckets", Path: "/table-buckets"},
79-
{Name: "🧊 mytablebucket", Path: "/table-buckets/mytablebucket"},
80-
{Name: "folder", Path: "/table-buckets/mytablebucket/folder"},
81-
},
82-
},
8364
{
8465
name: "path with trailing slash",
8566
path: "/folder/",
@@ -195,11 +176,6 @@ func TestParentPathCalculationLogic(t *testing.T) {
195176
currentDir: "/buckets/mybucket",
196177
expected: "/buckets",
197178
},
198-
{
199-
name: "table bucket directory",
200-
currentDir: "/table-buckets/mytablebucket",
201-
expected: "/table-buckets",
202-
},
203179
}
204180

205181
for _, tt := range tests {

weed/admin/dash/s3tables_management.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,9 @@ func (s *AdminServer) GetS3TablesBucketsData(ctx context.Context) (S3TablesBucke
100100
if strings.HasPrefix(entry.Entry.Name, ".") {
101101
continue
102102
}
103+
if !s3tables.IsTableBucketEntry(entry.Entry) {
104+
continue
105+
}
103106
metaBytes, ok := entry.Entry.Extended[s3tables.ExtendedKeyMetadata]
104107
if !ok {
105108
continue

0 commit comments

Comments
 (0)