diff options
| author | Marin Ivanov <[email protected]> | 2022-08-15 03:17:26 +0300 |
|---|---|---|
| committer | Marin Ivanov <[email protected]> | 2022-08-15 03:17:26 +0300 |
| commit | 2f1bb584258c53d74f97b04a6045f0e0b838f061 (patch) | |
| tree | 21665e0e5699b55cb28ff413523e4d477998a545 | |
| parent | b135dcf644394394e3250188dea39ff4e91f0e06 (diff) | |
Add object writing and directory listing
| -rw-r--r-- | file.go | 90 | ||||
| -rw-r--r-- | fileinfo.go | 3 | ||||
| -rw-r--r-- | fs.go | 81 | ||||
| -rw-r--r-- | fs_test.go | 61 |
4 files changed, 186 insertions, 49 deletions
@@ -5,25 +5,42 @@ import ( "io" "os" "path" + + "github.com/minio/minio-go/v7" ) type File struct { fs *Fs key string - r io.ReadCloser + r io.ReadSeekCloser w io.WriteCloser + werrc <-chan error cancel context.CancelFunc } // implements io.Closer func (f *File) Close() error { - f.cancel() + defer f.cancel() + if f.r != nil { + return f.r.Close() + } + if f.w != nil { + f.w.Close() + return <-f.werrc + } return nil } // implements io.Reader func (f *File) Read(b []byte) (int, error) { - return f.r.Read(b) + if f.r == nil { + return 0, ErrUnsupported + } + n, err := f.r.Read(b) + if err != nil { + return n, transformError(err) + } + return n, nil } // implements io.ReaderAt @@ -33,7 +50,15 @@ func (f *File) ReadAt(p []byte, off int64) (int, error) { // implements io.Writer func (f *File) Write(p []byte) (int, error) { - return f.w.Write(p) + if f.w == nil { + return 0, ErrUnsupported + } + + n, err := f.w.Write(p) + if err != nil { + return n, transformError(err) + } + return n, nil } // implements io.WriterAt @@ -45,16 +70,65 @@ func (f *File) Name() string { return path.Base(f.key) } +func (f *File) readdir(count int, onlyDir bool) ([]os.FileInfo, error) { + if !isDirPath(f.key) { + return nil, ErrUnsupported + } + if count == 0 { + return []os.FileInfo{}, nil + } + ctx, cancel := f.fs.contextWithTimeout() + defer cancel() + objectsCh := f.fs.client.ListObjects(ctx, f.fs.bucket, minio.ListObjectsOptions{Prefix: f.key}) + n := 0 + lastDir := f.key + results := make([]os.FileInfo, 0) + for objectInfo := range objectsCh { + base := path.Base(objectInfo.Key) + dir := path.Dir(objectInfo.Key) + "/" + if lastDir != dir { + lastDir = dir + results = append(results, &FileInfo{ + name: base, + isDir: true, + }) + if onlyDir { + n += 1 + } + } + if !onlyDir { + results = append(results, transformObjectInfo(objectInfo)) + n += 1 + } + if n > count { + break + } + } + + return results, nil +} + func (f *File) Readdir(count int) ([]os.FileInfo, error) { - panic("not implemented") + return f.readdir(count, false) } func (f *File) Readdirnames(n int) ([]string, error) { - panic("not implemented") + results, err := f.readdir(n, true) + if err != nil { + return nil, err + } + list := make([]string, 0, len(results)) + for _, info := range results { + list = append(list, info.Name()) + } + return list, nil } func (f *File) Seek(offset int64, whence int) (int64, error) { - panic("not implemented") + if f.r == nil { + return 0, ErrUnsupported + } + return f.r.Seek(offset, whence) } func (f *File) Stat() (os.FileInfo, error) { @@ -70,5 +144,5 @@ func (f *File) Truncate(size int64) error { } func (f *File) WriteString(s string) (ret int, err error) { - panic("not implemented") + return f.Write([]byte(s)) } diff --git a/fileinfo.go b/fileinfo.go index 079f56a..c5a7f1d 100644 --- a/fileinfo.go +++ b/fileinfo.go @@ -12,6 +12,7 @@ type FileInfo struct { name string size int64 mtime time.Time + isDir bool objectInfo *minio.ObjectInfo } @@ -38,7 +39,7 @@ func (fi *FileInfo) ModTime() time.Time { // abbreviation for Mode().IsDir() func (fi *FileInfo) IsDir() bool { - return false // S3 has no directories + return fi.isDir } // underlying data source (can return nil) @@ -1,6 +1,7 @@ package s3 import ( + "bytes" "context" "errors" "fmt" @@ -15,9 +16,15 @@ import ( "github.com/spf13/afero" ) +const ( + readWriteOnlyFlag = os.O_RDONLY | os.O_WRONLY // 0b1 + supportedFlags = os.O_CREATE | os.O_TRUNC | readWriteOnlyFlag +) + var ( ErrUnsupported = errors.New("unsupported operation") ErrInvalidProtocol = errors.New("invalid protocol") + ErrInvalidOpenFlag = errors.New("invalid open flag, expects either O_WRONLY or O_RDONLY") ) type Fs struct { @@ -75,8 +82,8 @@ func (fs *Fs) Name() string { // Create creates a file in the filesystem, returning the file and an // error, if any happens. -func (fs *Fs) Create(name string) (File, error) { - panic("not implemented") +func (fs *Fs) Create(name string) (afero.File, error) { + return fs.OpenFile(name, os.O_WRONLY, 0) } // Mkdir creates a directory in the filesystem, return an error if any @@ -93,17 +100,36 @@ func (fs *Fs) MkdirAll(path string, perm os.FileMode) error { // Open opens a file, returning it or an error, if any happens. func (fs *Fs) Open(name string) (afero.File, error) { - ctx, cancel := fs.contextWithTimeout() - obj, err := fs.client.GetObject(ctx, fs.bucket, name, minio.GetObjectOptions{}) - if err != nil { - return nil, err - } - return fs.newObjectReader(name, obj, cancel), nil + return fs.OpenFile(name, os.O_RDONLY, 0) } // OpenFile opens a file using the given flags and the given mode. -func (fs *Fs) OpenFile(name string, flag int, perm os.FileMode) (File, error) { - panic("not implemented") +func (fs *Fs) OpenFile(name string, flag int, perm os.FileMode) (afero.File, error) { + if flag&(^supportedFlags) != 0 { + return nil, ErrInvalidOpenFlag + } + + f := &File{fs: fs, key: name} + if isDirPath(name) { + if flag&os.O_WRONLY != 0 { + return nil, ErrInvalidOpenFlag + } + return f, nil + } + + var err error + if flag&os.O_WRONLY != 0 { + f.w, f.cancel, f.werrc = fs.newObjectWriter(name) + if err != nil { + return nil, err + } + } else { // os.O_RDONLY + f.r, f.cancel, err = fs.newObjectReader(name) + if err != nil { + return nil, err + } + } + return f, nil } // Remove removes a file identified by name, returning an error, if any @@ -187,13 +213,32 @@ func (fs *Fs) contextWithTimeout() (context.Context, context.CancelFunc) { return context.WithTimeout(context.Background(), fs.Timeout) } -func (fs *Fs) newObjectReader(key string, r io.ReadCloser, cancel context.CancelFunc) *File { - return &File{ - fs: fs, - key: key, - r: r, - cancel: cancel, +func (fs *Fs) newObjectReader(key string) (io.ReadSeekCloser, context.CancelFunc, error) { + ctx, cancel := context.WithCancel(context.Background()) + obj, err := fs.client.GetObject(ctx, fs.bucket, key, minio.GetObjectOptions{}) + if err != nil { + cancel() + return nil, nil, transformError(err) } + return obj, cancel, nil +} + +func (fs *Fs) newObjectWriter(key string) (io.WriteCloser, context.CancelFunc, <-chan error) { + ctx, cancel := context.WithCancel(context.Background()) + r, w := io.Pipe() + + errc := make(chan error, 1) + go func() { + b, err := io.ReadAll(r) + if err != nil { + errc <- err + return + } + _, err = fs.client.PutObject(ctx, fs.bucket, key, bytes.NewReader(b), int64(len(b)), minio.PutObjectOptions{}) + errc <- transformError(err) + }() + + return w, cancel, errc } func transformError(err error) error { @@ -210,3 +255,7 @@ func transformError(err error) error { } return err } + +func isDirPath(p string) bool { + return strings.HasSuffix(p, "/") +} @@ -1,7 +1,6 @@ package s3 import ( - "io" "testing" "time" @@ -11,86 +10,100 @@ import ( func TestFsListBuckets(t *testing.T) { is := is.New(t) - fs, err := newTestFs() + afs, err := newTestAfs() is.NoErr(err) - buckets, err := testListBuckets(fs) + buckets, err := testListBuckets(afs.Fs.(*Fs)) is.NoErr(err) is.Equal(len(buckets), 1) } func TestFsMkdir(t *testing.T) { is := is.New(t) - fs, err := newTestFs() + afs, err := newTestAfs() is.NoErr(err) - err = fs.Mkdir("test", 0o644) + err = afs.Mkdir("test", 0o644) is.Equal(err, ErrUnsupported) } func TestFsMkdirAll(t *testing.T) { is := is.New(t) - fs, err := newTestFs() + afs, err := newTestAfs() is.NoErr(err) - err = fs.MkdirAll("test1/test2", 0o644) + err = afs.MkdirAll("test1/test2", 0o644) is.Equal(err, ErrUnsupported) } func TestFsChmod(t *testing.T) { is := is.New(t) - fs, err := newTestFs() + afs, err := newTestAfs() is.NoErr(err) - err = fs.Chmod("test", 0o644) + err = afs.Chmod("test", 0o644) is.Equal(err, ErrUnsupported) } func TestFsChown(t *testing.T) { is := is.New(t) - fs, err := newTestFs() + afs, err := newTestAfs() is.NoErr(err) - err = fs.Chown("test", 1000, 1000) + err = afs.Chown("test", 1000, 1000) is.Equal(err, ErrUnsupported) } func TestFsChtimes(t *testing.T) { is := is.New(t) - fs, err := newTestFs() + afs, err := newTestAfs() is.NoErr(err) now := time.Now() - err = fs.Chtimes("test", now, now) + err = afs.Chtimes("test", now, now) is.Equal(err, ErrUnsupported) } func TestFsStat(t *testing.T) { is := is.New(t) - fs, err := newTestFs() + afs, err := newTestAfs() is.NoErr(err) - info, err := fs.Stat("dir/file") + info, err := afs.Stat("dir/file") is.NoErr(err) is.Equal(info.Name(), "file") } func TestFsStatNoExist(t *testing.T) { is := is.New(t) - fs, err := newTestFs() + afs, err := newTestAfs() is.NoErr(err) - _, err = fs.Stat("dir/non-existent") + _, err = afs.Stat("dir/non-existent") is.Equal(err, afero.ErrFileNotFound) } func TestFsOpenRead(t *testing.T) { is := is.New(t) - fs, err := newTestFs() + afs, err := newTestAfs() is.NoErr(err) - f, err := fs.Open("dir/file") - is.NoErr(err) - defer f.Close() - b, err := io.ReadAll(f) + b, err := afs.ReadFile("dir/file") is.NoErr(err) is.Equal(b, []byte("test")) } -func newTestFs() (*Fs, error) { +func TestFsReadNoExist(t *testing.T) { + is := is.New(t) + afs, err := newTestAfs() + is.NoErr(err) + _, err = afs.ReadFile("non-existent") + is.Equal(err, afero.ErrFileNotFound) +} + +func TestFsOpenWrite(t *testing.T) { + is := is.New(t) + afs, err := newTestAfs() + is.NoErr(err) + err = afs.WriteFile("dir/file_test", []byte("testdata"), 0) + is.NoErr(err) +} + +func newTestAfs() (*afero.Afero, error) { fs, err := NewFsFromURL("http://testuser:[email protected]:9000/test-bucket") - return fs, err + afs := &afero.Afero{Fs: fs} + return afs, err } func testListBuckets(fs *Fs) ([]string, error) { |
