summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMarin Ivanov <[email protected]>2022-08-15 03:17:26 +0300
committerMarin Ivanov <[email protected]>2022-08-15 03:17:26 +0300
commit2f1bb584258c53d74f97b04a6045f0e0b838f061 (patch)
tree21665e0e5699b55cb28ff413523e4d477998a545
parentb135dcf644394394e3250188dea39ff4e91f0e06 (diff)
Add object writing and directory listing
-rw-r--r--file.go90
-rw-r--r--fileinfo.go3
-rw-r--r--fs.go81
-rw-r--r--fs_test.go61
4 files changed, 186 insertions, 49 deletions
diff --git a/file.go b/file.go
index 2498bd8..0a7a3e5 100644
--- a/file.go
+++ b/file.go
@@ -5,25 +5,42 @@ import (
"io"
"os"
"path"
+
+ "github.com/minio/minio-go/v7"
)
type File struct {
fs *Fs
key string
- r io.ReadCloser
+ r io.ReadSeekCloser
w io.WriteCloser
+ werrc <-chan error
cancel context.CancelFunc
}
// implements io.Closer
func (f *File) Close() error {
- f.cancel()
+ defer f.cancel()
+ if f.r != nil {
+ return f.r.Close()
+ }
+ if f.w != nil {
+ f.w.Close()
+ return <-f.werrc
+ }
return nil
}
// implements io.Reader
func (f *File) Read(b []byte) (int, error) {
- return f.r.Read(b)
+ if f.r == nil {
+ return 0, ErrUnsupported
+ }
+ n, err := f.r.Read(b)
+ if err != nil {
+ return n, transformError(err)
+ }
+ return n, nil
}
// implements io.ReaderAt
@@ -33,7 +50,15 @@ func (f *File) ReadAt(p []byte, off int64) (int, error) {
// implements io.Writer
func (f *File) Write(p []byte) (int, error) {
- return f.w.Write(p)
+ if f.w == nil {
+ return 0, ErrUnsupported
+ }
+
+ n, err := f.w.Write(p)
+ if err != nil {
+ return n, transformError(err)
+ }
+ return n, nil
}
// implements io.WriterAt
@@ -45,16 +70,65 @@ func (f *File) Name() string {
return path.Base(f.key)
}
+func (f *File) readdir(count int, onlyDir bool) ([]os.FileInfo, error) {
+ if !isDirPath(f.key) {
+ return nil, ErrUnsupported
+ }
+ if count == 0 {
+ return []os.FileInfo{}, nil
+ }
+ ctx, cancel := f.fs.contextWithTimeout()
+ defer cancel()
+ objectsCh := f.fs.client.ListObjects(ctx, f.fs.bucket, minio.ListObjectsOptions{Prefix: f.key})
+ n := 0
+ lastDir := f.key
+ results := make([]os.FileInfo, 0)
+ for objectInfo := range objectsCh {
+ base := path.Base(objectInfo.Key)
+ dir := path.Dir(objectInfo.Key) + "/"
+ if lastDir != dir {
+ lastDir = dir
+ results = append(results, &FileInfo{
+ name: base,
+ isDir: true,
+ })
+ if onlyDir {
+ n += 1
+ }
+ }
+ if !onlyDir {
+ results = append(results, transformObjectInfo(objectInfo))
+ n += 1
+ }
+ if n > count {
+ break
+ }
+ }
+
+ return results, nil
+}
+
func (f *File) Readdir(count int) ([]os.FileInfo, error) {
- panic("not implemented")
+ return f.readdir(count, false)
}
func (f *File) Readdirnames(n int) ([]string, error) {
- panic("not implemented")
+ results, err := f.readdir(n, true)
+ if err != nil {
+ return nil, err
+ }
+ list := make([]string, 0, len(results))
+ for _, info := range results {
+ list = append(list, info.Name())
+ }
+ return list, nil
}
func (f *File) Seek(offset int64, whence int) (int64, error) {
- panic("not implemented")
+ if f.r == nil {
+ return 0, ErrUnsupported
+ }
+ return f.r.Seek(offset, whence)
}
func (f *File) Stat() (os.FileInfo, error) {
@@ -70,5 +144,5 @@ func (f *File) Truncate(size int64) error {
}
func (f *File) WriteString(s string) (ret int, err error) {
- panic("not implemented")
+ return f.Write([]byte(s))
}
diff --git a/fileinfo.go b/fileinfo.go
index 079f56a..c5a7f1d 100644
--- a/fileinfo.go
+++ b/fileinfo.go
@@ -12,6 +12,7 @@ type FileInfo struct {
name string
size int64
mtime time.Time
+ isDir bool
objectInfo *minio.ObjectInfo
}
@@ -38,7 +39,7 @@ func (fi *FileInfo) ModTime() time.Time {
// abbreviation for Mode().IsDir()
func (fi *FileInfo) IsDir() bool {
- return false // S3 has no directories
+ return fi.isDir
}
// underlying data source (can return nil)
diff --git a/fs.go b/fs.go
index 2b2e876..2ff9162 100644
--- a/fs.go
+++ b/fs.go
@@ -1,6 +1,7 @@
package s3
import (
+ "bytes"
"context"
"errors"
"fmt"
@@ -15,9 +16,15 @@ import (
"github.com/spf13/afero"
)
+const (
+ readWriteOnlyFlag = os.O_RDONLY | os.O_WRONLY // 0b1
+ supportedFlags = os.O_CREATE | os.O_TRUNC | readWriteOnlyFlag
+)
+
var (
ErrUnsupported = errors.New("unsupported operation")
ErrInvalidProtocol = errors.New("invalid protocol")
+ ErrInvalidOpenFlag = errors.New("invalid open flag, expects either O_WRONLY or O_RDONLY")
)
type Fs struct {
@@ -75,8 +82,8 @@ func (fs *Fs) Name() string {
// Create creates a file in the filesystem, returning the file and an
// error, if any happens.
-func (fs *Fs) Create(name string) (File, error) {
- panic("not implemented")
+func (fs *Fs) Create(name string) (afero.File, error) {
+ return fs.OpenFile(name, os.O_WRONLY, 0)
}
// Mkdir creates a directory in the filesystem, return an error if any
@@ -93,17 +100,36 @@ func (fs *Fs) MkdirAll(path string, perm os.FileMode) error {
// Open opens a file, returning it or an error, if any happens.
func (fs *Fs) Open(name string) (afero.File, error) {
- ctx, cancel := fs.contextWithTimeout()
- obj, err := fs.client.GetObject(ctx, fs.bucket, name, minio.GetObjectOptions{})
- if err != nil {
- return nil, err
- }
- return fs.newObjectReader(name, obj, cancel), nil
+ return fs.OpenFile(name, os.O_RDONLY, 0)
}
// OpenFile opens a file using the given flags and the given mode.
-func (fs *Fs) OpenFile(name string, flag int, perm os.FileMode) (File, error) {
- panic("not implemented")
+func (fs *Fs) OpenFile(name string, flag int, perm os.FileMode) (afero.File, error) {
+ if flag&(^supportedFlags) != 0 {
+ return nil, ErrInvalidOpenFlag
+ }
+
+ f := &File{fs: fs, key: name}
+ if isDirPath(name) {
+ if flag&os.O_WRONLY != 0 {
+ return nil, ErrInvalidOpenFlag
+ }
+ return f, nil
+ }
+
+ var err error
+ if flag&os.O_WRONLY != 0 {
+ f.w, f.cancel, f.werrc = fs.newObjectWriter(name)
+ if err != nil {
+ return nil, err
+ }
+ } else { // os.O_RDONLY
+ f.r, f.cancel, err = fs.newObjectReader(name)
+ if err != nil {
+ return nil, err
+ }
+ }
+ return f, nil
}
// Remove removes a file identified by name, returning an error, if any
@@ -187,13 +213,32 @@ func (fs *Fs) contextWithTimeout() (context.Context, context.CancelFunc) {
return context.WithTimeout(context.Background(), fs.Timeout)
}
-func (fs *Fs) newObjectReader(key string, r io.ReadCloser, cancel context.CancelFunc) *File {
- return &File{
- fs: fs,
- key: key,
- r: r,
- cancel: cancel,
+func (fs *Fs) newObjectReader(key string) (io.ReadSeekCloser, context.CancelFunc, error) {
+ ctx, cancel := context.WithCancel(context.Background())
+ obj, err := fs.client.GetObject(ctx, fs.bucket, key, minio.GetObjectOptions{})
+ if err != nil {
+ cancel()
+ return nil, nil, transformError(err)
}
+ return obj, cancel, nil
+}
+
+func (fs *Fs) newObjectWriter(key string) (io.WriteCloser, context.CancelFunc, <-chan error) {
+ ctx, cancel := context.WithCancel(context.Background())
+ r, w := io.Pipe()
+
+ errc := make(chan error, 1)
+ go func() {
+ b, err := io.ReadAll(r)
+ if err != nil {
+ errc <- err
+ return
+ }
+ _, err = fs.client.PutObject(ctx, fs.bucket, key, bytes.NewReader(b), int64(len(b)), minio.PutObjectOptions{})
+ errc <- transformError(err)
+ }()
+
+ return w, cancel, errc
}
func transformError(err error) error {
@@ -210,3 +255,7 @@ func transformError(err error) error {
}
return err
}
+
+func isDirPath(p string) bool {
+ return strings.HasSuffix(p, "/")
+}
diff --git a/fs_test.go b/fs_test.go
index f993849..3f04015 100644
--- a/fs_test.go
+++ b/fs_test.go
@@ -1,7 +1,6 @@
package s3
import (
- "io"
"testing"
"time"
@@ -11,86 +10,100 @@ import (
func TestFsListBuckets(t *testing.T) {
is := is.New(t)
- fs, err := newTestFs()
+ afs, err := newTestAfs()
is.NoErr(err)
- buckets, err := testListBuckets(fs)
+ buckets, err := testListBuckets(afs.Fs.(*Fs))
is.NoErr(err)
is.Equal(len(buckets), 1)
}
func TestFsMkdir(t *testing.T) {
is := is.New(t)
- fs, err := newTestFs()
+ afs, err := newTestAfs()
is.NoErr(err)
- err = fs.Mkdir("test", 0o644)
+ err = afs.Mkdir("test", 0o644)
is.Equal(err, ErrUnsupported)
}
func TestFsMkdirAll(t *testing.T) {
is := is.New(t)
- fs, err := newTestFs()
+ afs, err := newTestAfs()
is.NoErr(err)
- err = fs.MkdirAll("test1/test2", 0o644)
+ err = afs.MkdirAll("test1/test2", 0o644)
is.Equal(err, ErrUnsupported)
}
func TestFsChmod(t *testing.T) {
is := is.New(t)
- fs, err := newTestFs()
+ afs, err := newTestAfs()
is.NoErr(err)
- err = fs.Chmod("test", 0o644)
+ err = afs.Chmod("test", 0o644)
is.Equal(err, ErrUnsupported)
}
func TestFsChown(t *testing.T) {
is := is.New(t)
- fs, err := newTestFs()
+ afs, err := newTestAfs()
is.NoErr(err)
- err = fs.Chown("test", 1000, 1000)
+ err = afs.Chown("test", 1000, 1000)
is.Equal(err, ErrUnsupported)
}
func TestFsChtimes(t *testing.T) {
is := is.New(t)
- fs, err := newTestFs()
+ afs, err := newTestAfs()
is.NoErr(err)
now := time.Now()
- err = fs.Chtimes("test", now, now)
+ err = afs.Chtimes("test", now, now)
is.Equal(err, ErrUnsupported)
}
func TestFsStat(t *testing.T) {
is := is.New(t)
- fs, err := newTestFs()
+ afs, err := newTestAfs()
is.NoErr(err)
- info, err := fs.Stat("dir/file")
+ info, err := afs.Stat("dir/file")
is.NoErr(err)
is.Equal(info.Name(), "file")
}
func TestFsStatNoExist(t *testing.T) {
is := is.New(t)
- fs, err := newTestFs()
+ afs, err := newTestAfs()
is.NoErr(err)
- _, err = fs.Stat("dir/non-existent")
+ _, err = afs.Stat("dir/non-existent")
is.Equal(err, afero.ErrFileNotFound)
}
func TestFsOpenRead(t *testing.T) {
is := is.New(t)
- fs, err := newTestFs()
+ afs, err := newTestAfs()
is.NoErr(err)
- f, err := fs.Open("dir/file")
- is.NoErr(err)
- defer f.Close()
- b, err := io.ReadAll(f)
+ b, err := afs.ReadFile("dir/file")
is.NoErr(err)
is.Equal(b, []byte("test"))
}
-func newTestFs() (*Fs, error) {
+func TestFsReadNoExist(t *testing.T) {
+ is := is.New(t)
+ afs, err := newTestAfs()
+ is.NoErr(err)
+ _, err = afs.ReadFile("non-existent")
+ is.Equal(err, afero.ErrFileNotFound)
+}
+
+func TestFsOpenWrite(t *testing.T) {
+ is := is.New(t)
+ afs, err := newTestAfs()
+ is.NoErr(err)
+ err = afs.WriteFile("dir/file_test", []byte("testdata"), 0)
+ is.NoErr(err)
+}
+
+func newTestAfs() (*afero.Afero, error) {
fs, err := NewFsFromURL("http://testuser:[email protected]:9000/test-bucket")
- return fs, err
+ afs := &afero.Afero{Fs: fs}
+ return afs, err
}
func testListBuckets(fs *Fs) ([]string, error) {