package s3 import ( "bytes" "context" "errors" "fmt" "io" neturl "net/url" "os" "strings" "time" "github.com/minio/minio-go/v7" "github.com/minio/minio-go/v7/pkg/credentials" "github.com/spf13/afero" ) const ( readWriteOnlyFlag = os.O_RDONLY | os.O_WRONLY // 0b1 supportedFlags = os.O_CREATE | os.O_TRUNC | readWriteOnlyFlag ) var ( ErrUnsupported = errors.New("unsupported operation") ErrInvalidProtocol = errors.New("invalid protocol") ErrInvalidOpenFlag = errors.New("invalid open flag, expects either O_WRONLY or O_RDONLY") ErrNotDirectory = errors.New("not a directory") ) type Fs struct { client *minio.Client bucket string // Configurables Timeout time.Duration } func NewFs(endpoint, bucket, accessKeyID, secretAccessKey string, useSSL bool) (*Fs, error) { // Initialize minio client object. client, err := minio.New(endpoint, &minio.Options{ Creds: credentials.NewStaticV4(accessKeyID, secretAccessKey, ""), Secure: useSSL, }) if err != nil { return nil, err } return &Fs{ client: client, bucket: bucket, Timeout: 10 * time.Second, }, nil } func NewFsFromURL(url string) (*Fs, error) { r, err := neturl.ParseRequestURI(url) if err != nil { return nil, fmt.Errorf("failed to parse url: %w", err) } useSSL := false switch r.Scheme { case "http": case "https": useSSL = true default: return nil, ErrInvalidProtocol } endpoint := r.Host bucket := r.Path[1:] accessKeyId := r.User.Username() secretAccessKey, _ := r.User.Password() return NewFs(endpoint, bucket, accessKeyId, secretAccessKey, useSSL) } // The name of this FileSystem func (fs *Fs) Name() string { return "s3" } // Create creates a file in the filesystem, returning the file and an // error, if any happens. func (fs *Fs) Create(name string) (afero.File, error) { return fs.OpenFile(name, os.O_WRONLY, 0) } // Mkdir creates a directory in the filesystem, return an error if any // happens. func (fs *Fs) Mkdir(name string, perm os.FileMode) error { return ErrUnsupported } // MkdirAll creates a directory path and all parents that does not exist // yet. func (fs *Fs) MkdirAll(path string, perm os.FileMode) error { return ErrUnsupported } // Open opens a file, returning it or an error, if any happens. func (fs *Fs) Open(name string) (afero.File, error) { return fs.OpenFile(name, os.O_RDONLY, 0) } // OpenFile opens a file using the given flags and the given mode. func (fs *Fs) OpenFile(name string, flag int, perm os.FileMode) (afero.File, error) { if flag&(^supportedFlags) != 0 { return nil, ErrInvalidOpenFlag } f := &File{fs: fs, key: name} if isDirPath(name) { if flag&os.O_WRONLY != 0 { return nil, ErrInvalidOpenFlag } return f, nil } var err error if flag&os.O_WRONLY != 0 { f.w, f.cancel, f.werrc = fs.newObjectWriter(name) if err != nil { return nil, err } } else { // os.O_RDONLY f.r, f.cancel, err = fs.newObjectReader(name) if err != nil { return nil, err } } return f, nil } // Remove removes a file identified by name, returning an error, if any // happens. func (fs *Fs) Remove(name string) error { ctx, cancel := fs.contextWithTimeout() defer cancel() err := fs.client.RemoveObject(ctx, fs.bucket, name, minio.RemoveObjectOptions{}) if err != nil { return fromErrorResponse(err) } return nil } // RemoveAll removes a directory path and any children it contains. It // does not fail if the path does not exist (return nil). func (fs *Fs) RemoveAll(path string) error { ctx, cancel := fs.contextWithTimeout() defer cancel() if !isDirPath(path) { return ErrNotDirectory } objectsCh := fs.client.ListObjects(ctx, fs.bucket, minio.ListObjectsOptions{Prefix: path, Recursive: true}) errc := fs.client.RemoveObjects(ctx, fs.bucket, objectsCh, minio.RemoveObjectsOptions{}) for err := range errc { if err.Err != nil { return fromErrorResponse(err.Err) } } return nil } // Rename renames a file. func (fs *Fs) Rename(oldname, newname string) error { ctx, cancel := fs.contextWithTimeout() defer cancel() _, err := fs.client.CopyObject(ctx, minio.CopyDestOptions{Bucket: fs.bucket, Object: newname}, minio.CopySrcOptions{Bucket: fs.bucket, Object: oldname}) if err != nil { return fromErrorResponse(err) } err = fs.client.RemoveObject(ctx, fs.bucket, oldname, minio.RemoveObjectOptions{}) if err != nil { return fromErrorResponse(err) } return nil } // Stat returns a FileInfo describing the named file, or an error, if any // happens. func (fs *Fs) Stat(name string) (os.FileInfo, error) { ctx, cancel := fs.contextWithTimeout() defer cancel() info, err := fs.client.StatObject(ctx, fs.bucket, name, minio.GetObjectOptions{}) if err != nil { return nil, fromErrorResponse(err) } return fromObjectInfo(info), nil } // Chmod changes the mode of the named file to mode. func (fs *Fs) Chmod(name string, mode os.FileMode) error { return ErrUnsupported } // Chown changes the uid and gid of the named file. func (fs *Fs) Chown(name string, uid, gid int) error { return ErrUnsupported } // Chtimes changes the access and modification times of the named file func (fs *Fs) Chtimes(name string, atime time.Time, mtime time.Time) error { return ErrUnsupported } func (fs *Fs) contextWithTimeout() (context.Context, context.CancelFunc) { return context.WithTimeout(context.Background(), fs.Timeout) } func (fs *Fs) newObjectReader(key string) (io.ReadSeekCloser, context.CancelFunc, error) { ctx, cancel := context.WithCancel(context.Background()) obj, err := fs.client.GetObject(ctx, fs.bucket, key, minio.GetObjectOptions{}) if err != nil { cancel() return nil, nil, fromErrorResponse(err) } return obj, cancel, nil } func (fs *Fs) newObjectWriter(key string) (io.WriteCloser, context.CancelFunc, <-chan error) { ctx, cancel := context.WithCancel(context.Background()) r, w := io.Pipe() errc := make(chan error, 1) go func() { b, err := io.ReadAll(r) if err != nil { errc <- err return } _, err = fs.client.PutObject(ctx, fs.bucket, key, bytes.NewReader(b), int64(len(b)), minio.PutObjectOptions{}) errc <- fromErrorResponse(err) }() return w, cancel, errc } func (fs *Fs) readdir(directory string, count int, onlyDir bool) ([]os.FileInfo, error) { if !isDirPath(directory) { return nil, ErrNotDirectory } if count == 0 { return []os.FileInfo{}, nil } ctx, cancel := fs.contextWithTimeout() defer cancel() objectsCh := fs.client.ListObjects(ctx, fs.bucket, minio.ListObjectsOptions{Prefix: directory}) n := uint(0) maxcount := uint(count) results := make([]os.FileInfo, 0) for objectInfo := range objectsCh { if onlyDir && !isDirPath(objectInfo.Key) { continue } results = append(results, fromObjectInfo(objectInfo)) n += 1 if n > maxcount { break } } return results, nil } func isDirPath(p string) bool { return strings.HasSuffix(p, "/") }