package s3 import ( "bytes" "context" "errors" "fmt" "io" neturl "net/url" "os" "strings" "time" "github.com/minio/minio-go/v7" "github.com/minio/minio-go/v7/pkg/credentials" "github.com/spf13/afero" ) const ( readWriteOnlyFlag = os.O_RDONLY | os.O_WRONLY // 0b1 supportedFlags = os.O_CREATE | os.O_TRUNC | readWriteOnlyFlag ) var ( ErrUnsupported = errors.New("unsupported operation") ErrInvalidProtocol = errors.New("invalid protocol") ErrInvalidOpenFlag = errors.New("invalid open flag, expects either O_WRONLY or O_RDONLY") ErrNotDirectory = errors.New("not a directory") ) type Fs struct { client *minio.Client bucket string // Configurables Timeout time.Duration } func NewFs(endpoint, bucket, accessKeyID, secretAccessKey string, useSSL bool) (*Fs, error) { // Initialize minio client object. client, err := minio.New(endpoint, &minio.Options{ Creds: credentials.NewStaticV4(accessKeyID, secretAccessKey, ""), Secure: useSSL, }) if err != nil { return nil, err } return &Fs{ client: client, bucket: bucket, Timeout: 10 * time.Second, }, nil } func NewFsFromURL(url string) (*Fs, error) { endpoint, bucket, accessKeyID, secretAccessKey, useSSL, err := parseUrl(url) if err != nil { return nil, fmt.Errorf("failed to parse url: %w", err) } return NewFs(endpoint, bucket, accessKeyID, secretAccessKey, useSSL) } // The name of this FileSystem func (fs *Fs) Name() string { return "s3" } // Create creates a file in the filesystem, returning the file and an // error, if any happens. func (fs *Fs) Create(name string) (afero.File, error) { return fs.OpenFile(name, os.O_WRONLY, 0) } // Mkdir creates a directory in the filesystem, return an error if any // happens. func (fs *Fs) Mkdir(name string, perm os.FileMode) error { return ErrUnsupported } // MkdirAll creates a directory path and all parents that does not exist // yet. func (fs *Fs) MkdirAll(path string, perm os.FileMode) error { return ErrUnsupported } // Open opens a file, returning it or an error, if any happens. func (fs *Fs) Open(name string) (afero.File, error) { return fs.OpenFile(name, os.O_RDONLY, 0) } // OpenFile opens a file using the given flags and the given mode. func (fs *Fs) OpenFile(name string, flag int, perm os.FileMode) (afero.File, error) { if flag&(^supportedFlags) != 0 { return nil, ErrInvalidOpenFlag } f := &File{fs: fs, key: name} if isDirPath(name) { if flag&os.O_WRONLY != 0 { return nil, ErrInvalidOpenFlag } return f, nil } var err error if flag&os.O_WRONLY != 0 { f.w, f.cancel, f.werrc = fs.newObjectWriter(name) if err != nil { return nil, err } } else { // os.O_RDONLY f.r, f.cancel, err = fs.newObjectReader(name) if err != nil { return nil, err } } return f, nil } // Remove removes a file identified by name, returning an error, if any // happens. func (fs *Fs) Remove(name string) error { ctx, cancel := fs.contextWithTimeout() defer cancel() err := fs.client.RemoveObject(ctx, fs.bucket, name, minio.RemoveObjectOptions{}) if err != nil { return fromErrorResponse(err) } return nil } // RemoveAll removes a directory path and any children it contains. It // does not fail if the path does not exist (return nil). func (fs *Fs) RemoveAll(path string) error { ctx, cancel := fs.contextWithTimeout() defer cancel() if !isDirPath(path) { return ErrNotDirectory } objectsCh := fs.client.ListObjects(ctx, fs.bucket, minio.ListObjectsOptions{Prefix: path, Recursive: true}) errc := fs.client.RemoveObjects(ctx, fs.bucket, objectsCh, minio.RemoveObjectsOptions{}) for err := range errc { if err.Err != nil { return fromErrorResponse(err.Err) } } return nil } // Rename renames a file. func (fs *Fs) Rename(oldname, newname string) error { ctx, cancel := fs.contextWithTimeout() defer cancel() _, err := fs.client.CopyObject(ctx, minio.CopyDestOptions{Bucket: fs.bucket, Object: newname}, minio.CopySrcOptions{Bucket: fs.bucket, Object: oldname}) if err != nil { return fromErrorResponse(err) } err = fs.client.RemoveObject(ctx, fs.bucket, oldname, minio.RemoveObjectOptions{}) if err != nil { return fromErrorResponse(err) } return nil } // Stat returns a FileInfo describing the named file, or an error, if any // happens. func (fs *Fs) Stat(name string) (os.FileInfo, error) { ctx, cancel := fs.contextWithTimeout() defer cancel() info, err := fs.client.StatObject(ctx, fs.bucket, name, minio.GetObjectOptions{}) if err != nil { return nil, fromErrorResponse(err) } return fromObjectInfo(info), nil } // Chmod changes the mode of the named file to mode. func (fs *Fs) Chmod(name string, mode os.FileMode) error { return ErrUnsupported } // Chown changes the uid and gid of the named file. func (fs *Fs) Chown(name string, uid, gid int) error { return ErrUnsupported } // Chtimes changes the access and modification times of the named file func (fs *Fs) Chtimes(name string, atime time.Time, mtime time.Time) error { return ErrUnsupported } func (fs *Fs) contextWithTimeout() (context.Context, context.CancelFunc) { return context.WithTimeout(context.Background(), fs.Timeout) } func (fs *Fs) newObjectReader(key string) (io.ReadSeekCloser, context.CancelFunc, error) { ctx, cancel := context.WithCancel(context.Background()) obj, err := fs.client.GetObject(ctx, fs.bucket, key, minio.GetObjectOptions{}) if err != nil { cancel() return nil, nil, fromErrorResponse(err) } return obj, cancel, nil } func (fs *Fs) newObjectWriter(key string) (io.WriteCloser, context.CancelFunc, <-chan error) { ctx, cancel := context.WithCancel(context.Background()) r, w := io.Pipe() errc := make(chan error, 1) go func() { maxAtomicSize := int64(16 * (1 << 20)) lr := io.LimitReader(r, maxAtomicSize) b, err := io.ReadAll(lr) if err != nil { errc <- err return } if ctx.Err() != nil { errc <- ctx.Err() return } var rd io.Reader var size int64 if int64(len(b)) == maxAtomicSize { rd = io.MultiReader(bytes.NewReader(b), r) size = -1 } else { rd = bytes.NewReader(b) size = int64(len(b)) } _, err = fs.client.PutObject(ctx, fs.bucket, key, rd, size, minio.PutObjectOptions{}) errc <- fromErrorResponse(err) }() return w, cancel, errc } func (fs *Fs) readdir(directory string, count int, onlyDir bool) ([]os.FileInfo, error) { if !isDirPath(directory) { return nil, ErrNotDirectory } if count == 0 { return []os.FileInfo{}, nil } ctx, cancel := fs.contextWithTimeout() defer cancel() objectsCh := fs.client.ListObjects(ctx, fs.bucket, minio.ListObjectsOptions{Prefix: directory}) n := uint(0) maxcount := uint(count) results := make([]os.FileInfo, 0) for objectInfo := range objectsCh { if onlyDir && !isDirPath(objectInfo.Key) { continue } results = append(results, fromObjectInfo(objectInfo)) n += 1 if n > maxcount { break } } return results, nil } func isDirPath(p string) bool { return p == "" || strings.HasSuffix(p, "/") } func parseUrl(url string) (endpoint, bucket, accessKeyID, secretAccessKey string, useSSL bool, err error) { var r *neturl.URL if r, err = neturl.ParseRequestURI(url); err != nil { return } switch r.Scheme { case "http": case "https": useSSL = true default: err = ErrInvalidProtocol return } endpoint = r.Host bucket = r.Path[1:] accessKeyID = r.User.Username() secretAccessKey, _ = r.User.Password() return }