Fix dbfs error handling (#36844)

Add tests for opening non-existing files.
This commit is contained in:
wxiaoguang
2026-03-07 00:28:46 +08:00
committed by GitHub
parent f3bdcc58af
commit 2ce71629c3
5 changed files with 91 additions and 49 deletions

View File

@@ -33,21 +33,22 @@ const (
// It doesn't respect the file format in the filename like ".zst", since it's difficult to reopen a closed compressed file and append new content.
// Why doesn't it store logs in object storage directly? Because it's not efficient to append content to object storage.
func WriteLogs(ctx context.Context, filename string, offset int64, rows []*runnerv1.LogRow) ([]int, error) {
flag := os.O_WRONLY
flag, openFileFor := os.O_WRONLY, "write-only"
if offset == 0 {
// Create file only if offset is 0, or it could result in content holes if the file doesn't exist.
flag |= os.O_CREATE
// Only allow to create file if offset is 0 (the first write), see #25560.
// Otherwise, it might result in content holes if the file has been deleted after transferred (actions.TransferLogs).
flag, openFileFor = os.O_WRONLY|os.O_CREATE, "write-create"
}
name := DBFSPrefix + filename
f, err := dbfs.OpenFile(ctx, name, flag)
if err != nil {
return nil, fmt.Errorf("dbfs OpenFile %q: %w", name, err)
return nil, fmt.Errorf("dbfs.OpenFile %q for %s: %w", name, openFileFor, err)
}
defer f.Close()
stat, err := f.Stat()
if err != nil {
return nil, fmt.Errorf("dbfs Stat %q: %w", name, err)
return nil, fmt.Errorf("dbfs.Stat %q: %w", name, err)
}
if stat.Size() < offset {
// If the size is less than offset, refuse to write, or it could result in content holes.
@@ -56,7 +57,7 @@ func WriteLogs(ctx context.Context, filename string, offset int64, rows []*runne
}
if _, err := f.Seek(offset, io.SeekStart); err != nil {
return nil, fmt.Errorf("dbfs Seek %q: %w", name, err)
return nil, fmt.Errorf("dbfs.Seek %q: %w", name, err)
}
writer := bufio.NewWriterSize(f, defaultBufSize)
@@ -121,16 +122,17 @@ const (
// TransferLogs transfers logs from DBFS to object storage.
// It happens when the file is complete and no more logs will be appended.
// It respects the file format in the filename like ".zst", and compresses the content if needed.
// The task log file must be marked as "log_in_storage=true" after the transfer.
func TransferLogs(ctx context.Context, filename string) (func(), error) {
name := DBFSPrefix + filename
remove := func() {
if err := dbfs.Remove(ctx, name); err != nil {
log.Warn("dbfs remove %q: %v", name, err)
log.Warn("dbfs.Remove %q: %v", name, err)
}
}
f, err := dbfs.Open(ctx, name)
if err != nil {
return nil, fmt.Errorf("dbfs open %q: %w", name, err)
return nil, fmt.Errorf("dbfs.Open %q: %w", name, err)
}
defer f.Close()
@@ -164,7 +166,7 @@ func RemoveLogs(ctx context.Context, inStorage bool, filename string) error {
name := DBFSPrefix + filename
err := dbfs.Remove(ctx, name)
if err != nil {
return fmt.Errorf("dbfs remove %q: %w", name, err)
return fmt.Errorf("dbfs.Remove %q: %w", name, err)
}
return nil
}
@@ -180,7 +182,7 @@ func OpenLogs(ctx context.Context, inStorage bool, filename string) (io.ReadSeek
name := DBFSPrefix + filename
f, err := dbfs.Open(ctx, name)
if err != nil {
return nil, fmt.Errorf("dbfs open %q: %w", name, err)
return nil, fmt.Errorf("dbfs.Open %q: %w", name, err)
}
return f, nil
}