diff options
Diffstat (limited to 'fs.go')
| -rw-r--r-- | fs.go | 81 |
1 files changed, 65 insertions, 16 deletions
@@ -1,6 +1,7 @@ package s3 import ( + "bytes" "context" "errors" "fmt" @@ -15,9 +16,15 @@ import ( "github.com/spf13/afero" ) +const ( + readWriteOnlyFlag = os.O_RDONLY | os.O_WRONLY // 0b1 + supportedFlags = os.O_CREATE | os.O_TRUNC | readWriteOnlyFlag +) + var ( ErrUnsupported = errors.New("unsupported operation") ErrInvalidProtocol = errors.New("invalid protocol") + ErrInvalidOpenFlag = errors.New("invalid open flag, expects either O_WRONLY or O_RDONLY") ) type Fs struct { @@ -75,8 +82,8 @@ func (fs *Fs) Name() string { // Create creates a file in the filesystem, returning the file and an // error, if any happens. -func (fs *Fs) Create(name string) (File, error) { - panic("not implemented") +func (fs *Fs) Create(name string) (afero.File, error) { + return fs.OpenFile(name, os.O_WRONLY, 0) } // Mkdir creates a directory in the filesystem, return an error if any @@ -93,17 +100,36 @@ func (fs *Fs) MkdirAll(path string, perm os.FileMode) error { // Open opens a file, returning it or an error, if any happens. func (fs *Fs) Open(name string) (afero.File, error) { - ctx, cancel := fs.contextWithTimeout() - obj, err := fs.client.GetObject(ctx, fs.bucket, name, minio.GetObjectOptions{}) - if err != nil { - return nil, err - } - return fs.newObjectReader(name, obj, cancel), nil + return fs.OpenFile(name, os.O_RDONLY, 0) } // OpenFile opens a file using the given flags and the given mode. -func (fs *Fs) OpenFile(name string, flag int, perm os.FileMode) (File, error) { - panic("not implemented") +func (fs *Fs) OpenFile(name string, flag int, perm os.FileMode) (afero.File, error) { + if flag&(^supportedFlags) != 0 { + return nil, ErrInvalidOpenFlag + } + + f := &File{fs: fs, key: name} + if isDirPath(name) { + if flag&os.O_WRONLY != 0 { + return nil, ErrInvalidOpenFlag + } + return f, nil + } + + var err error + if flag&os.O_WRONLY != 0 { + f.w, f.cancel, f.werrc = fs.newObjectWriter(name) + if err != nil { + return nil, err + } + } else { // os.O_RDONLY + f.r, f.cancel, err = fs.newObjectReader(name) + if err != nil { + return nil, err + } + } + return f, nil } // Remove removes a file identified by name, returning an error, if any @@ -187,13 +213,32 @@ func (fs *Fs) contextWithTimeout() (context.Context, context.CancelFunc) { return context.WithTimeout(context.Background(), fs.Timeout) } -func (fs *Fs) newObjectReader(key string, r io.ReadCloser, cancel context.CancelFunc) *File { - return &File{ - fs: fs, - key: key, - r: r, - cancel: cancel, +func (fs *Fs) newObjectReader(key string) (io.ReadSeekCloser, context.CancelFunc, error) { + ctx, cancel := context.WithCancel(context.Background()) + obj, err := fs.client.GetObject(ctx, fs.bucket, key, minio.GetObjectOptions{}) + if err != nil { + cancel() + return nil, nil, transformError(err) } + return obj, cancel, nil +} + +func (fs *Fs) newObjectWriter(key string) (io.WriteCloser, context.CancelFunc, <-chan error) { + ctx, cancel := context.WithCancel(context.Background()) + r, w := io.Pipe() + + errc := make(chan error, 1) + go func() { + b, err := io.ReadAll(r) + if err != nil { + errc <- err + return + } + _, err = fs.client.PutObject(ctx, fs.bucket, key, bytes.NewReader(b), int64(len(b)), minio.PutObjectOptions{}) + errc <- transformError(err) + }() + + return w, cancel, errc } func transformError(err error) error { @@ -210,3 +255,7 @@ func transformError(err error) error { } return err } + +func isDirPath(p string) bool { + return strings.HasSuffix(p, "/") +} |
