MinIO 源码分析

Posted yvhqbat

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了MinIO 源码分析相关的知识,希望对你有一定的参考价值。

文章目录

Bucket日志审计概要设计说明书

参考

1. MinIO 事件通知模块的源码剖析

1. 事件通知模块 NotificationSys

// NotificationSys - notification system.
type NotificationSys struct 
	sync.RWMutex
	targetList                 *event.TargetList
	targetResCh                chan event.TargetIDResult
	bucketRulesMap             map[string]event.RulesMap
	bucketRemoteTargetRulesMap map[string]map[event.TargetID]event.RulesMap
	peerClients                []*peerRESTClient


// Send - sends event data to all matching targets.
func (sys *NotificationSys) Send(args eventArgs) 
	sys.RLock()
	targetIDSet := sys.bucketRulesMap[args.BucketName].Match(args.EventName, args.Object.Name)
	sys.RUnlock()

	if len(targetIDSet) == 0 
		return
	

	sys.targetList.Send(args.ToEvent(true), targetIDSet, sys.targetResCh)

2. target.Target

存储桶事件可以发布到的通知目标有:Redis、MySQL、PostgreSQL、Kafka、Elasticsearch 等。

对于多个target,使用 sync.WaitGroup进行并发发送。

// Target - event target interface
type Target interface 
	ID() TargetID
	IsActive() (bool, error)
	Save(Event) error
	Send(string) error
	Close() error
	HasQueueStore() bool


// TargetList - holds list of targets indexed by target ID.
type TargetList struct 
	sync.RWMutex
	targets map[TargetID]Target


// Send - sends events to targets identified by target IDs.
func (list *TargetList) Send(event Event, targetIDset TargetIDSet, resCh chan<- TargetIDResult) 
	go func() 
		var wg sync.WaitGroup
		for id := range targetIDset 
			list.RLock()
			target, ok := list.targets[id]
			list.RUnlock()
			if ok 
				wg.Add(1)
				go func(id TargetID, target Target) 
					defer wg.Done()
					tgtRes := TargetIDResultID: id
					if err := target.Save(event); err != nil 
						tgtRes.Err = err
					
					resCh <- tgtRes
				(id, target)
			 else 
				resCh <- TargetIDResultID: id
			
		
		wg.Wait()
	()

3. target.RedisTarget

通知目标支持两种格式: namespaceaccess

如果用的是 namespacee 格式,MinIO将存储桶里的对象同步成Redis hash中的条目。对于每一个条目,对应一个存储桶里的对象,其key都被设为"存储桶名称/对象名称",value都是一个有关这个MinIO对象的JSON格式的事件数据。如果对象更新或者删除,hash中对象的条目也会相应的更新或者删除。

如果使用的是 access ,MinIO使用RPUSH将事件添加到list中。这个list中每一个元素都是一个JSON格式的list,这个list中又有两个元素,第一个元素是时间戳的字符串,第二个元素是一个含有在这个存储桶上进行操作的事件数据的JSON对象。在这种格式下,list中的元素不会更新或者删除。

// RedisTarget - Redis target.
type RedisTarget struct 
	id         event.TargetID
	args       RedisArgs
	pool       *redis.Pool
	store      Store
	firstPing  bool
	loggerOnce func(ctx context.Context, err error, id interface, errKind ...interface)



// Save - saves the events to the store if questore is configured, which will be replayed when the redis connection is active.
func (target *RedisTarget) Save(eventData event.Event) error 
	if target.store != nil 
		return target.store.Put(eventData)
	
	_, err := target.IsActive()
	if err != nil 
		return err
	
	return target.send(eventData)



// send - sends an event to the redis.
func (target *RedisTarget) send(eventData event.Event) error 
	conn := target.pool.Get()
	defer func() 
		cErr := conn.Close()
		target.loggerOnce(context.Background(), cErr, target.ID())
	()

	if target.args.Format == event.NamespaceFormat 
		objectName, err := url.QueryUnescape(eventData.S3.Object.Key)
		if err != nil 
			return err
		
		key := eventData.S3.Bucket.Name + "/" + objectName

		if eventData.EventName == event.ObjectRemovedDelete 
			_, err = conn.Do("HDEL", target.args.Key, key)
		 else 
			var data []byte
			if data, err = json.Marshal(struct Records []event.Event []event.EventeventData); err != nil 
				return err
			

			_, err = conn.Do("HSET", target.args.Key, key, data)
		
		if err != nil 
			return err
		
	

	if target.args.Format == event.AccessFormat 
		data, err := json.Marshal([]RedisAccessEventEvent: []event.EventeventData, EventTime: eventData.EventTime)
		if err != nil 
			return err
		
		if _, err := conn.Do("RPUSH", target.args.Key, data); err != nil 
			return err
		
	

	return nil

4. target.mysqlTarget

// MySQLTarget - MySQL target.
type MySQLTarget struct 
	id         event.TargetID
	args       MySQLArgs
	updateStmt *sql.Stmt
	deleteStmt *sql.Stmt
	insertStmt *sql.Stmt
	db         *sql.DB
	store      Store
	firstPing  bool
	loggerOnce func(ctx context.Context, err error, id interface, errKind ...interface)


// Save - saves the events to the store which will be replayed when the SQL connection is active.
func (target *MySQLTarget) Save(eventData event.Event) error 
	if target.store != nil 
		return target.store.Put(eventData)
	
	_, err := target.IsActive()
	if err != nil 
		return err
	
	return target.send(eventData)


// send - sends an event to the mysql.
func (target *MySQLTarget) send(eventData event.Event) error 
	if target.args.Format == event.NamespaceFormat 
		objectName, err := url.QueryUnescape(eventData.S3.Object.Key)
		if err != nil 
			return err
		
		key := eventData.S3.Bucket.Name + "/" + objectName

		if eventData.EventName == event.ObjectRemovedDelete 
			_, err = target.deleteStmt.Exec(key)
		 else 
			var data []byte
			if data, err = json.Marshal(struct Records []event.Event []event.EventeventData); err != nil 
				return err
			

			_, err = target.updateStmt.Exec(key, data)
		

		return err
	

	if target.args.Format == event.AccessFormat 
		eventTime, err := time.Parse(event.AMZTimeFormat, eventData.EventTime)
		if err != nil 
			return err
		

		data, err := json.Marshal(struct Records []event.Event []event.EventeventData)
		if err != nil 
			return err
		

		_, err = target.insertStmt.Exec(eventTime, data)

		return err
	

	return nil

5. target.KafkaTarget

// KafkaTarget - Kafka target.
type KafkaTarget struct 
	id         event.TargetID
	args       KafkaArgs
	producer   sarama.SyncProducer
	config     *sarama.Config
	store      Store
	loggerOnce func(ctx context.Context, err error, id interface, errKind ...interface)


// Save - saves the events to the store which will be replayed when the Kafka connection is active.
func (target *KafkaTarget) Save(eventData event.Event) error 
	if target.store != nil 
		return target.store.Put(eventData)
	
	_, err := target.IsActive()
	if err != nil 
		return err
	
	return target.send(eventData)


// send - sends an event to the kafka.
func (target *KafkaTarget) send(eventData event.Event) error 
	objectName, err := url.QueryUnescape(eventData.S3.Object.Key)
	if err != nil 
		return err
	
	key := eventData.S3.Bucket.Name + "/" + objectName

	data, err := json.Marshal(event.LogEventName: eventData.EventName, Key: key, Records: []event.EventeventData)
	if err != nil 
		return err
	

	msg := sarama.ProducerMessage
		Topic: target.args.Topic,
		Key:   sarama.StringEncoder(key),
		Value: sarama.ByteEncoder(data),
	

	_, _, err = target.producer.SendMessage(&msg)

	return err

6. target.Store

// Store - To persist the events.
type Store interface 
	Put(event event.Event) error
	Get(key string) (event.Event, error)
	List() ([]string, error)
	Del(key string) error
	Open() error



// QueueStore - Filestore for persisting events.
type QueueStore struct 
	sync.RWMutex
	currentEntries uint64
	entryLimit     uint64
	directory      string


// NewQueueStore - Creates an instance for QueueStore.
func NewQueueStore(directory string, limit uint64) Store 
	if limit == 0 
		limit = defaultLimit
		_, maxRLimit, err := sys.GetMaxOpenFileLimit()
		if err == nil 
			// Limit the maximum number of entries
			// to maximum open file limit
			if maxRLimit < limit 
				limit = maxRLimit
			
		
	

	return &QueueStore
		directory:  directory,
		entryLimit: limit,
	


// write - writes event to the directory.
func (store *QueueStore) write(key string, e event.Event) error 

	// Marshalls the event.
	eventData, err := json.Marshal(e)
	if err != nil 
		return err
	

	path := filepath.Join(store.directory, key+eventExt)
	if err := ioutil.WriteFile(path, eventData, os.FileMode(0770)); err != nil 
		return err
	

	// Increment the event count.
	store.currentEntries++

	return nil


// Put - puts a event to the store.
func (store *QueueStore) Put(e event.Event) error 
	store.Lock()
	defer store.Unlock()
	if store.currentEntries >= store.entryLimit 
		return errLimitExceeded
	
	key, err := getNewUUID()
	if err != nil 
		return err
	
	return store.write(key, e)

7. 事件类型 event.Name

MinIO 支持的事件类型:

// Name - event type enum.
// Refer http://docs.aws.amazon.com/AmazonS3/latest/dev/NotificationHowTo.html#notification-how-to-event-types-and-destinations
type Name int

// Values of Name
const (
	ObjectAccessedAll Name = 1 + iota
	ObjectAccessedGet
	ObjectAccessedGetRetention
	ObjectAccessedGetLegalHold
	ObjectAccessedHead
	ObjectCreatedAll
	ObjectCreatedCompleteMultipartUpload
	ObjectCreatedCopy
	ObjectCreatedPost
	ObjectCreatedPut
	ObjectCreatedPutRetention
	ObjectCreatedPutLegalHold
	ObjectRemovedAll
	ObjectRemovedDelete
)

8. 事件 event.Event

MinIO 发送的用于发布事件的通知消息是JSON格式的

// Identity represents access key who caused the event.
type Identity struct 
	PrincipalID string `json:"principalId"`


// Bucket represents bucket metadata of the event.
type Bucket struct 
	Name          string   `json:"name"`
	OwnerIdentity Identity `json:"ownerIdentity"`
	ARN           string   `json:"arn"`


// Object represents object metadata of the event.
type Object struct 
	Key          string            `json:"key"`
	Size         int64             `json:"size,omitempty"`
	ETag         string            `json:"eTag,omitempty"`
	ContentType  string            `json:"contentType,omitempty"`
	UserMetadata map[string]string `json:"userMetadata,omitempty"`
	VersionID    string            `json:"versionId,omitempty"`
	Sequencer    string            `json:"sequencer"`


// Metadata represents event metadata.
type Metadata struct 
	SchemaVersion   string `json:"s3SchemaVersion"`
	ConfigurationID string `json:"configurationId"`
	Bucket          Bucket `json:"bucket"`
	Object          Object `json:"object"`


// Source represents client information who triggered the event.
type Source struct 
	Host      string `json:"host"`
	Port      string `json:"port"`
	UserAgent string `json:"userAgent"`


// Event represents event notification information defined in
// http://docs.aws.amazon.com/AmazonS3/latest/dev/notification-content-structure.html.
type Event struct 
	EventVersion      string            `json:"eventVersion"`
	EventSource       string            `json:"eventSource"`
	AwsRegion         string            `json:"awsRegion"`
	EventTime         string            `json:"eventTime"`
	EventName         Name              `json:"eventName"`
	UserIdentity      Identity          `json:"userIdentity"`
	RequestParameters map[string]string `json:"requestParameters"`
	ResponseElements  map[string]string `json:"responseElements"`
	S3                Metadata          `json:"s3"`
	Source            Source            `json:"source"`

9. 事件日志 event.Log

// Log represents event information for some event targets.
type Log struct 
	EventName Name
	Key       string
	Records   []Event

2. MinIO 网关源码剖析

启动 minio gateway:

[root@node8217 install]# cat start_minio.sh
#!/bin/bash

export MINIO_ACCESS_KEY=ak_123456
export MINIO_SECRET_KEY=sk_123456
#export MINIO_PROMETHEUS_AUTH_TYPE="public"
#export MINIO_REGION_NAME="wuhan"

#./minio server --address :9009 /BigData/minio &
./minio gateway s3 http://127.0.0.1:9009 &

启动后,可以向使用 minio server 一样使用 minio gateway。

1. minio 启动入口

mino/main.go

package main // import "github.com/minio/minio"

import (
	"os"

	minio "github.com/minio/minio/cmd"

	// Import gateway
	_ "github.com/minio/minio/cmd/gateway"
)

func main() 
	minio.Main(os.Args)

这里引入包 _ "github.com/minio/minio/cmd/gateway",会调用github.com/minio/minio/cmd/gateway包的所支持网关的init函数,初始化支持的网关。

minio/cmd/main.go

func newApp(name string) *cli.App 
	// Collection of minio commands currently supported are.
	commands := []cli.Command

	// registerCommand registers a cli command.
	registerCommand := func(command cli.Command) 
		commands = append(commands, command)
	

	// Register all commands.
	registerCommand(serverCmd)
	registerCommand(gatewayCmd)

	app := cli.NewApp()
	app.Name = name
	app.Commands = commands

	return app


// Main main for minio server.
func Main(args []string) 
	// Set the minio app name.
	appName := filepath.Base(args[0])

	// Run the app - exit on error.
	if err := newApp(appName).Run(args); err != nil 
		os.Exit(1)
	

注册了两个命令,全局变量 serverCmd 和 gatewayCmd

2. 注册S3网关

gatewayCmd

minio/cmd/gateway-main.go

var (
	gatewayCmd = cli.Command
		Name:            "gateway",
		Usage:           "start object storage gateway",
		Flags:           append(ServerFlags, GlobalFlags...),
		HideHelpCommand: true,
	
)

// RegisterGatewayCommand registers a new command for gateway.
func RegisterGatewayCommand(cmd cli.Command) error 
	cmd.Flags = append(append(cmd.Flags, ServerFlags...), GlobalFlags...)
	gatewayCmd.Subcommands = append(gatewayCmd.Subcommands, cmd)
	return nil

注册 S3 gateway

minio/cmd/gateway/s3/gateway-s3.go

const (
	s3Backend = "s3"
)

func init() 
	minio.RegisterGatewayCommand(cli.Command
		Name:               s3Backend,
		Usage:              "Amazon Simple Storage Service (S3)",
		Action:             s3GatewayMain,
		HideHelpCommand:    true,
	)



// Handler for 'minio gateway s3' command line.
func s3GatewayMain(ctx *cli.Context) 
	args := ctx.Args()
	if !ctx.Args().Present() 
		args = cli.Args"https://s3.amazonaws.com"
	

	serverAddr := ctx.GlobalString("address")
	if serverAddr == "" || serverAddr == ":"+minio.GlobalMinioDefaultPort 
		serverAddr = ctx.String("address")
	
	// Validate gateway arguments.
	logger.FatalIf(minio.ValidateGatewayArguments(serverAddr, args.First()), "Invalid argument")

	// Start the gateway..
	minio.StartGateway(ctx, &S3args.First())

重点在于 minio.StartGateway(ctx, &S3args.First()),根据参数启动相应的网关。

3. S3实现Gateway接口

minio/cmd/gateway-main.go

// StartGateway - handler for 'minio gateway <name>'.
func StartGateway(ctx *cli.Context, gw Gateway) 
	// 网关启动 ......

可见,s3.S3 实现了 cmd.Gateway 接口,

// Gateway represents a gateway backend.
type Gateway interface 
	// Name returns the unique name of the gateway.
	Name() string

	// NewGatewayLayer returns a new  ObjectLayer.
	NewGatewayLayer(creds auth.Credentials) (ObjectLayer, error)

	// Returns true if gateway is ready for production.
	Production() bool

S3网关实现Gateway接口

minio/cmd/gateway/s3/gateway-s3.go

// S3 implements Gateway.
type S3 struct 
	host string


// Name implements Gateway interface.
func (g *S3) Name() string 
	return s3Backend


// NewGatewayLayer returns s3 ObjectLayer.
func (g *S3) NewGatewayLayer(creds auth.Credentials) (minio.ObjectLayer, error) 
	// creds are ignored here, since S3 gateway implements chaining
	// all credentials.
	clnt, err := newS3(g.host)
	if err != nil 
		return nil, err
	

	metrics := minio.NewMetrics()

	t := &minio.MetricsTransport
		Transport: minio.NewGatewayHTTPTransport(),
		Metrics:   metrics,
	

	// Set custom transport
	clnt.SetCustomTransport(t)

	probeBucketName := randString(60, rand.NewSource(time.Now().UnixNano()), "probe-bucket-sign-")

	// Check if the provided keys are valid.
	if _, err = clnt.BucketExists(probeBucketName); err != nil 
		if miniogo.ToErrorResponse(err).Code != "AccessDenied" 
			return nil, err
		
	

	s := s3Objects
		Client:  clnt,
		Metrics: metrics,
		HTTPClient: &http.Client
			Transport: t,
		,
	

	// Enables single encryption of KMS is configured.
	if minio.GlobalKMS != nil 
		encS := s3EncObjectss

		// Start stale enc multipart uploads cleanup routine.
		go encS.cleanupStaleEncMultipartUploads(minio.GlobalContext,
			minio.GlobalMultipartCleanupInterval, minio.GlobalMultipartExpiry)

		return &encS, nil
	
	return &s, nil


// Production - s3 gateway is production ready.
func (g *S3) Production() bool 
	return true

4. 网关启动

minio/cmd/gateway-main.go

// StartGateway - handler for 'minio gateway <name>'.
func StartGateway(ctx *cli.Context, gw Gateway) 
	if gw == nil 
		logger.FatalIf(errUnexpected, "Gateway implementation not initialized")
	

	// Handle common command args.
	handleCommonCmdArgs(ctx)

	// Handle gateway specific env
	gatewayHandleEnvVars()

	// Set when gateway is enabled
	globalIsGateway = true

	router := mux.NewRouter().SkipClean(true).UseEncodedPath()

	// Enable IAM admin APIs if etcd is enabled, if not just enable basic
	// operations such as profiling, server info etc.
	registerAdminRouter(router, enableConfigOps, enableIAMOps)

	// Add healthcheck router
	registerHealthCheckRouter(router)

	// Add server metrics router
	registerMetricsRouter(router)

	// Register web router when its enabled.
	if globalBrowserEnabled 
		logger.FatalIf(registerWebRouter(router), "Unable to configure web browser")
	

	// Currently only NAS and S3 gateway support encryption headers.
	encryptionEnabled := gatewayName == "s3" || gatewayName == "nas"
	allowSSEKMS := gatewayName == "s3" // Only S3 can support SSE-KMS (as pass-through)

	// Add API router.
	registerAPIRouter(router, encryptionEnabled, allowSSEKMS)


	httpServer := xhttp.NewServer([]stringglobalCLIContext.Addr,
		criticalErrorHandlerregisterHandlers(router, globalHandlers...), getCert)
	httpServer.BaseContext = func(listener net.Listener) context.Context 
		return GlobalContext
	
	go func() 
		globalHTTPServerErrorCh <- httpServer.Start()
	()

	globalObjLayerMutex.Lock()
	globalHTTPServer = httpServer
	globalObjLayerMutex.Unlock()

	newObject, err := gw.NewGatewayLayer(globalActiveCred)
	newObject = NewGatewayLayerWithLocker(newObject)

	// Once endpoints are finalized, initialize the new object api in safe mode.
	globalObjLayerMutex.Lock()
	globalObjectAPI = newObject
	globalObjLayerMutex.Unlock()

	// Calls all New() for all sub-systems.
	newAllSubsystems()

	if enableIAMOps 
		// Initialize IAM sys.
		logger.FatalIf(globalIAMSys.Init(GlobalContext, newObject), "Unable to initialize IAM system")
	

	if globalCacheConfig.Enabled 
		// initialize the new disk cache objects.
		var cacheAPI CacheObjectLayer
		cacheAPI, err = newServerCacheObjects(GlobalContext, globalCacheConfig)
		logger.FatalIf(err, "Unable to initialize disk caching")

		globalObjLayerMutex.Lock()
		globalCacheObjectAPI = cacheAPI
		globalObjLayerMutex.Unlock()
	

3. MinIO 网关磁盘缓存源码剖析

1. disk cache 使用

#!/bin/bash

export MINIO_ACCESS_KEY=ak_123456
export MINIO_SECRET_KEY=sk_123456
#export MINIO_PROMETHEUS_AUTH_TYPE="public"
#export MINIO_REGION_NAME="wuhan"

#./minio server --address :9009 /BigData/minio &

export MINIO_CACHE="on"
export MINIO_CACHE_DRIVES="/mnt/drive1,/mnt/drive2"
export MINIO_CACHE_EXCLUDE="*.pdf,mybucket/*"
export MINIO_CACHE_QUOTA=80
export MINIO_CACHE_AFTER=3
export MINIO_CACHE_WATERMARK_LOW=70
export MINIO_CACHE_WATERMARK_HIGH=90

./minio gateway s3 http://127.0.0.1:9009 &

2. disk cache 实现

func StartGateway(ctx *cli.Context, gw Gateway) 
	// ......
	if globalCacheConfig.Enabled 
		// initialize the new disk cache objects.
		var cacheAPI CacheObjectLayer
		cacheAPI, err = newServerCacheObjects(GlobalContext, globalCacheConfig)
		logger.FatalIf(err, "Unable to initialize disk caching")

		globalObjLayerMutex.Lock()
		globalCacheObjectAPI = cacheAPI
		globalObjLayerMutex.Unlock()
	
	// ......

minio/cmd/disk-cache.go

// CacheObjectLayer implements primitives for cache object API layer.
type CacheObjectLayer interface 
	// Object operations.
	GetObjectNInfo(ctx context.Context, bucket, object string, rs *HTTPRangeSpec, h http.Header, lockType LockType, opts ObjectOptions) (gr *GetObjectReader, err error)
	GetObjectInfo(ctx context.Context, bucket, object string, opts ObjectOptions) (objInfo ObjectInfo, err error)
	DeleteObject(ctx context.Context, bucket, object string) error
	DeleteObjects(ctx context.Context, bucket string, objects []string) ([]error, error)
	PutObject(ctx context.Context, bucket, object string, data *PutObjReader, opts ObjectOptions) (objInfo ObjectInfo, err error)
	CopyObject(ctx context.Context, srcBucket, srcObject, destBucket, destObject string, srcInfo ObjectInfo, srcOpts, dstOpts ObjectOptions) (objInfo ObjectInfo, err error)
	// Storage operations.
	StorageInfo(ctx context.Context) CacheStorageInfo
	CacheStats() *CacheStats

cacheObjects 结构体实现了 CacheObjectLayer 接口:

// Returns cacheObjects for use by Server.
func newServerCacheObjects(ctx context.Context, config cache.Config) (CacheObjectLayer, error) 
	// list of disk caches for cache "drives" specified in config.json or MINIO_CACHE_DRIVES env var.
	cache, migrateSw, err := newCache(config)
	if err != nil 
		return nil, err
	
	c := &cacheObjects
		cache:      cache,
		exclude:    config.Exclude,
		after:      config.After,
		migrating:  migrateSw,
		migMutex:   sync.Mutex,
		cacheStats: newCacheStats(),
		GetObjectInfoFn: func(ctx context.Context, bucket, object string, opts ObjectOptions) (ObjectInfo, error) 
			return newObjectLayerFn().GetObjectInfo(ctx, bucket, object, opts)
		,
		GetObjectNInfoFn: func(ctx context.Context, bucket, object string, rs *HTTPRangeSpec, h http.Header, lockType LockType, opts ObjectOptions) (gr *GetObjectReader, err error) 
			return newObjectLayerFn().GetObjectNInfo(ctx, bucket, object, rs, h, lockType, opts)
		,
		DeleteObjectFn: func(ctx context.Context, bucket, object string) error 
			return newObjectLayerFn().DeleteObject(ctx, bucket, object)
		,
		DeleteObjectsFn: func(ctx context.Context, bucket string, objects []string) ([]error, error) 
			errs := make([]error, len(objects))
			for idx, object := range objects 
				errs[idx] = newObjectLayerFn().DeleteObject(ctx, bucket, object)
			
			return errs, nil
		,
		PutObjectFn: func(ctx context.Context, bucket, object string, data *PutObjReader, opts ObjectOptions) (objInfo ObjectInfo, err error) 
			return newObjectLayerFn().PutObject(ctx, bucket, object, data, opts)
		,
		CopyObjectFn: func(ctx context.Context, srcBucket, srcObject, destBucket, destObject string, srcInfo ObjectInfo, srcOpts, dstOpts ObjectOptions) (objInfo ObjectInfo, err error) 
			return newObjectLayerFn().CopyObject(ctx, srcBucket, srcObject, destBucket, destObject, srcInfo, srcOpts, dstOpts)
		,
	

	if migrateSw 
		go c.migrateCacheFromV1toV2(ctx)
	
	go c.gc(ctx)
	return c, nil

// Abstracts disk caching - used by the S3 layer
type cacheObjects struct 
	// slice of cache drives
	cache []*diskCache
	// file path patterns to exclude from cache
	exclude []string
	// number of accesses after which to cache an object
	after int
	// if true migration is in progress from v1 to v2
	migrating bool
	// mutex to protect migration bool
	migMutex sync.Mutex

	// Cache stats
	cacheStats *CacheStats

	GetObjectNInfoFn func(ctx context.Context, bucket, object string, rs *HTTPRangeSpec, h http.Header, lockType LockType, opts ObjectOptions) (gr *GetObjectReader, err error)
	GetObjectInfoFn  func(ctx context.Context, bucket, object string, opts ObjectOptions) (objInfo ObjectInfo, err error)
	DeleteObjectFn   func(ctx context.Context, bucket, object string) error
	DeleteObjectsFn  func(ctx context.Context, bucket string, objects []string) ([]error, error)
	PutObjectFn      func(ctx context.Context, bucket, object string, data *PutObjReader, opts ObjectOptions) (objInfo ObjectInfo, err error)
	CopyObjectFn     func(ctx context.Context, srcBucket, srcObject, destBucket, destObject string, srcInfo ObjectInfo, srcOpts, dstOpts ObjectOptions) (objInfo ObjectInfo, err error)

真正缓存层是 []*diskCache 实现:

// newCache initializes the cacheFSObjects for the "drives" specified in config.json
// or the global env overrides.
func newCache(config cache.Config) ([]*diskCache, bool, error) 
	var caches []*diskCache
	ctx := logger.SetReqInfo(GlobalContext, &logger.ReqInfo)
	formats, migrating, err := loadAndValidateCacheFormat(ctx, config.Drives)
	if err != nil 
		return nil, false, err
	
	for i, dir := range config.Drives 
		// skip diskCache creation for cache drives missing a format.json
		if formats[i] == nil 
			caches = append(caches, nil)
			continue
		
		if err := checkAtimeSupport(dir); err != nil 
			return nil, false, errors.New("Atime support required for disk caching")
		

		quota := config.MaxUse
		if quota == 0 
			quota = config.Quota
		
		cache, err := newDiskCache(dir, quota, config.After, config.WatermarkLow, config.WatermarkHigh)
		if err != nil 
			return nil, false, err
		
		caches = append(caches, cache)
	
	return caches, migrating, nil

4. MinIO IAM 模块

1. 用户鉴权

S3协议的每个操作,都需要IAMSys进行权限校验,主要代码如下:

	if globalIAMSys.IsAllowed(iampolicy.Args
		AccountName:     cred.AccessKey,
		Action:          iampolicy.Action(action),
		BucketName:      bucketName,
		ConditionValues: getConditionValues(r, "", cred.AccessKey, claims),
		ObjectName:      objectName,
		IsOwner:         owner,
		Claims:          claims,
	) 
		// Request is allowed return the appropriate access key.
		return cred.AccessKey, owner, ErrNone
	
	return cred.AccessKey, owner, ErrAccessDenied

2. globalIAMSys

minio/cmd/globals.go

var (
	globalIAMSys            *IAMSys
)

cmd/server-main.go

func newAllSubsystems() 
	// Create new notification system and initialize notification targets
	globalNotificationSys = NewNotificationSys(globalEndpoints)

	// Create new bucket metadata system.
	globalBucketMetadataSys = NewBucketMetadataSys()

	// Create a new config system.
	globalConfigSys = NewConfigSys()

	// Create new IAM system.
	globalIAMSys = NewIAMSys()

	// Create new policy system.
	globalPolicySys = NewPolicySys()

	// Create new lifecycle system.
	globalLifecycleSys = NewLifecycleSys()

	// Create new bucket object lock subsystem
	globalBucketObjectLockSys = NewBucketObjectLockSys()

	// Create new bucket quota subsystem
	globalBucketQuotaSys = NewBucketQuotaSys()

minio/cmd/iam.go

package cmd


// IAMSys - config system.
type IAMSys struct 
	usersSysType UsersSysType

	// map of policy names to policy definitions
	iamPolicyDocsMap map[string]iampolicy.Policy
	// map of usernames to credentials
	iamUsersMap map[string]auth.Credentials
	// map of group names to group info
	iamGroupsMap map[string]GroupInfo
	// map of user names to groups they are a member of
	iamUserGroupMemberships map[string]set.StringSet
	// map of usernames/temporary access keys to policy names
	iamUserPolicyMap map[string]MappedPolicy
	// map of group names to policy names
	iamGroupPolicyMap map[string]MappedPolicy

	// Persistence layer for IAM subsystem
	store IAMStorageAPI


// NewIAMSys - creates new config system object.
func NewIAMSys() *IAMSys 
	return &IAMSys
		usersSysType:            MinIOUsersSysType,
		iamUsersMap:             make(map[string]auth.Credentials),
		iamPolicyDocsMap:        make(map[string]iampolicy.Policy),
		iamUserPolicyMap:        make(map[string]MappedPolicy),
		iamGroupsMap:            make(map[string]GroupInfo),
		iamUserGroupMemberships: make(map[string]set.StringSet),
	

  • iamUsersMap map[string]auth.Credentials 保存了用户信息;

  • store IAMStorageAPI 是IAM系统的持久化存储层,可以是:etcd/minio;

3. globalIAMSys 初始化

minio/cmd/iam.go

// Init - initializes config system from iam.json
func (sys *IAMSys) Init(ctx context.Context, objAPI ObjectLayer) error 

	if globalEtcdClient == nil 
		sys.store = newIAMObjectStore(ctx, objAPI)
	 else 
		sys.store = newIAMEtcdStore(ctx)
	

	// Migrate IAM configuration
	if err := sys.doIAMConfigMigration(ctx); err != nil 
		return err
	

	err := sys.store.loadAll(ctx, sys)

	go sys.store.watch(ctx, sys)
	return err

如果 globalEtcdClient 不为空,优先以 etcd 作为存储层。

sys.store.loadAll(ctx, sys) 加载所有 IAM 数据;

go sys.store.watch(ctx, sys) 后台监控所有更新;

4. IAMEtcdStore 实现 IAMStorageAPI

  1. 初始化时,加载etcd中的所有数据;
  2. 后台watch,及时reload;

minio/cmd/iam-etcd-store.go

// IAMEtcdStore implements IAMStorageAPI
type IAMEtcdStore struct 
	sync.RWMutex

	ctx context.Context

	client *etcd.Client


func newIAMEtcdStore(ctx context.Context) *IAMEtcdStore 
	return &IAMEtcdStoreclient: globalEtcdClient, ctx: ctx

func (ies *IAMEtcdStore) loadAll(ctx context.Context, sys *IAMSys) error 加载所有IAM数据到内存:

func (ies *IAMEtcdStore) loadAll(ctx context.Context, sys *IAMSys) error 
	iamUsersMap := make(map[string]auth.Credentials)
	iamGroupsMap := make(map[string]GroupInfo)
	iamPolicyDocsMap := make(map[string]iampolicy.Policy)
	iamUserPolicyMap := make(map[string]MappedPolicy)
	iamGroupPolicyMap := make(map[string]MappedPolicy)

	isMinIOUsersSys := false
	ies.rlock()
	if sys.usersSysType == MinIOUsersSysType 
		isMinIOUsersSys = true
	
	ies.runlock()

	if err := ies.loadPolicyDocs(ctx, iamPolicyDocsMap); err != nil 
		return err
	

	// load STS temp users
	if err := ies.loadUsers(ctx, stsUser, iamUsersMap); err != nil 
		return err
	

	if isMinIOUsersSys 
		// load long term users
		if err := ies.loadUsers(ctx, regularUser, iamUsersMap); err != nil 
			return err
		
		if err := ies.loadUsers(ctx, srvAccUser, iamUsersMap); err != nil 
			return err
		
		if err := ies.loadGroups(ctx, iamGroupsMap); err != nil 
			return err
		
		if err := ies.loadMappedPolicies(ctx, regularUser, false, iamUserPolicyMap); err != nil 
			return err
		
	

	// load STS policy mappings into the same map
	if err := ies.loadMappedPolicies(ctx, stsUser, false, iamUserPolicyMap); err != nil 
		return err
	
	// load policies mapped to groups
	if err := ies.loadMappedPolicies(ctx, regularUser, true, iamGroupPolicyMap); err != nil 
		return err
	

	ies.lock()
	defer ies.Unlock()

	// Merge the new reloaded entries into global map.
	// See issue https://github.com/minio/minio/issues/9651
	// where the present list of entries on disk are not yet
	// latest, there is a small window where this can make
	// valid users invalid.
	for k, v := range iamUsersMap 
		sys.iamUsersMap[k] = v
	

	for k, v := range iamPolicyDocsMap 
		sys.iamPolicyDocsMap[k] = v
	

	// Sets default canned policies, if none are set.
	setDefaultCannedPolicies(sys.iamPolicyDocsMap)

	for k, v := range iamUserPolicyMap 
		sys.iamUserPolicyMap[k] = v
	

	// purge any expired entries which became expired now.
	for k, v := range sys.iamUsersMap 
		if v.IsExpired() 
			delete(sys.iamUsersMap, k)
			delete(sys.iamUserPolicyMap, k)
			// Deleting on the etcd is taken care of in the next cycle
		
	

	for k, v := range iamGroupPolicyMap 
		sys.iamGroupPolicyMap[k] = v
	

	for k, v := range iamGroupsMap 
		sys.iamGroupsMap[k] = v
	

	sys.buildUserGroupMemberships()

	return nil

func (ies *IAMEtcdStore) watch(ctx context.Context, sys *IAMSys) 后台监控数据更新:

func (ies *IAMEtcdStore) watch(ctx context.Context, sys *IAMSys) 
	for 
	outerLoop:
		// Refresh IAMSys with etcd watch.
		watchCh := ies.client.Watch(ctx,
			iamConfigPrefix, etcd.WithPrefix(), etcd.WithKeysOnly())

		for 
			select 
			case <-ctx.Done():
				return
			case watchResp, ok := <-watchCh:
				if !ok 
					time.Sleep(1 * time.Second)
					// Upon an error on watch channel
					// re-init the watch channel.
					goto outerLoop
				
				if err := watchResp.Err(); err != nil 
					logger.LogIf(ctx, err)
					// log and retry.
					time.Sleep(1 * time.Second)
					// Upon an error on watch channel
					// re-init the watch channel.
					goto outerLoop
				
				for _, event := range watchResp.Events 
					ies.lock()
					ies.reloadFromEvent(sys, event)
					ies.unlock()
				
			
		
	

// sys.RLock is held by caller.
func (ies *IAMEtcdStore) reloadFromEvent(sys *IAMSys, event *etcd.Event) 
	eventCreate := event.IsModify() || event.IsCreate()
	eventDelete := event.Type == etcd.EventTypeDelete
	usersPrefix := strings.HasPrefix(string(event.Kv.Key), iamConfigUsersPrefix)
	groupsPrefix := strings.HasPrefix(string(event.Kv.Key), iamConfigGroupsPrefix)
	stsPrefix := strings.HasPrefix(string(event.Kv.Key), iamConfigSTSPrefix)
	policyPrefix := strings.HasPrefix(string(event.Kv.Key), iamConfigPoliciesPrefix)
	policyDBUsersPrefix := strings.HasPrefix(string(event.Kv.Key), iamConfigPolicyDBUsersPrefix)
	policyDBSTSUsersPrefix := strings.HasPrefix(string(event.Kv.Key), iamConfigPolicyDBSTSUsersPrefix)
	policyDBGroupsPrefix := strings.HasPrefix(string(event.Kv.Key), iamConfigPolicyDBGroupsPrefix)

	switch 
	case eventCreate:
		switch 
		case usersPrefix:
			accessKey := path.Dir(strings.TrimPrefix(string(event.Kv.Key),
				iamConfigUsersPrefix))
			ies.loadUser(accessKey, regularUser, sys.iamUsersMap)
		case stsPrefix:
			accessKey := path.Dir(strings.TrimPrefix(string(event.Kv.Key),
				iamConfigSTSPrefix))
			ies.loadUser(accessKey, stsUser, sys.iamUsersMap)
		case groupsPrefix:
			group := path.Dir(strings.TrimPrefix(string(event.Kv.Key),
				iamConfigGroupsPrefix))
			ies.loadGroup(group, sys.iamGroupsMap)
			gi := sys.iamGroupsMap[group]
			sys.removeGroupFromMembershipsMap(group)
			sys.updateGroupMembershipsMap(group, &gi)
		case policyPrefix:
			policyName := path.Dir(strings.TrimPrefix(string(event.Kv.Key),
				iamConfigPoliciesPrefix))
			ies.loadPolicyDoc(policyName, sys.iamPolicyDocsMap)
		case policyDBUsersPrefix:
			policyMapFile := strings.TrimPrefix(string(event.Kv.Key),
				iamConfigPolicyDBUsersPrefix)
			user := strings.TrimSuffix(policyMapFile, ".json")
			ies.loadMappedPolicy(user, regularUser, false, sys.iamUserPolicyMap)
		case policyDBSTSUsersPrefix:
			policyMapFile := strings.TrimPrefix(string(event.Kv.Key),
				iamConfigPolicyDBSTSUsersPrefix)
			user := strings.TrimSuffix(policyMapFile, ".json")
			ies.loadMappedPolicy(user, stsUser, false, sys.iamUserPolicyMap)
		case policyDBGroupsPrefix:
			policyMapFile := strings.TrimPrefix(string(event.Kv.Key),
				iamConfigPolicyDBGroupsPrefix)
			user := strings.TrimSuffix(policyMapFile, ".json")
			ies.loadMappedPolicy(user, regularUser, true, sys.iamGroupPolicyMap)
		
	case eventDelete:
		switch 
		case usersPrefix:
			accessKey := path.Dir(strings.TrimPrefix(string(event.Kv.Key),
				iamConfigUsersPrefix))
			delete(sys.iamUsersMap, accessKey)
		case stsPrefix:
			accessKey := path.Dir(strings.TrimPrefix(string(event.Kv.Key),
				iamConfigSTSPrefix))
			delete(sys.iamUsersMap, accessKey)
		case groupsPrefix:
			group := path.Dir(strings.TrimPrefix(string(event.Kv.Key),
				iamConfigGroupsPrefix))
			sys.removeGroupFromMembershipsMap(group)
			delete(sys.iamGroupsMap, group)
			delete(sys.iamGroupPolicyMap, group)
		case policyPrefix:
			policyName := path.Dir(strings.TrimPrefix(string(event.Kv.Key),
				iamConfigPoliciesPrefix))
			delete(sys.iamPolicyDocsMap, policyName)
		case policyDBUsersPrefix:
			policyMapFile := strings.TrimPrefix(string(event.Kv.Key),
				iamConfigPolicyDBUsersPrefix)
			user := strings.TrimSuffix(policyMapFile, ".json")
			delete(sys.iamUserPolicyMap, user)
		case policyDBSTSUsersPrefix:
			policyMapFile := strings.TrimPrefix(string(event.Kv.Key),
				iamConfigPolicyDBSTSUsersPrefix)
			user := strings.TrimSuffix(policyMapFile, ".json")
			delete(sys.iamUserPolicyMap, user)
		case policyDBGroupsPrefix:
			policyMapFile := strings.TrimPrefix(string(event.Kv.Key),
				iamConfigPolicyDBGroupsPrefix)
			user := strings.TrimSuffix(policyMapFile, ".json")
			delete(sys.iamGroupPolicyMap, user)
		
	

用户信息的增删改查

func (ies *IAMEtcdStore) saveUserIdentity(name string, userType IAMUserType, u UserIdentity) error 
	return ies.saveIAMConfig(u, getUserIdentityPath(name, userType))


func (ies *IAMEtcdStore) saveIAMConfig(item interface, path string) error 
	data, err := json.Marshal(item)
	if err != nil 
		return err
	
	if globalConfigEncrypted 
		data, err = madmin.EncryptData(globalActiveCred.String(), data)
		if err != nil 
			return err
		
	
	return saveKeyEtcd(ies.ctx, ies.client, path, data)

func (ies *IAMEtcdStore) deleteUserIdentity(name string, userType IAMUserType) error 
	err := ies.deleteIAMConfig(getUserIdentityPath(name, userType))
	if err == errConfigNotFound 
		err = errNoSuchUser
	
	return err


func (ies *IAMEtcdStore) deleteIAMConfig(path string) error 
	return deleteKeyEtcd(ies.ctx, ies.client, path)

5. MinIO配置管理源码剖析

参考:

配置文件:$HOME/.minio/config.json ,在 RELEASE.2018-08-18T03-49-57Z 版本之后,配置文件保存在后端存储中。

1. 全局变量 globalConfigSys

minio/cmd/server-main.go

func serverMain(ctx *cli.Context) 
	// ...
	
	// 初始化所有子系统
	newAllSubsystems()
	
	// ...


func newAllSubsystems() 
	// Create new notification system and initialize notification targets
	globalNotificationSys = NewNotificationSys(globalEndpoints)

	// Create new bucket metadata system.
	globalBucketMetadataSys = NewBucketMetadataSys()

	// Create a new config system.
	globalConfigSys = NewConfigSys()

	// Create new IAM system.
	globalIAMSys = NewIAMSys()

	// Create new policy system.
	globalPolicySys = NewPolicySys()

	// Create new lifecycle system.
	globalLifecycleSys = NewLifecycleSys()

	// Create new bucket encryption subsystem
	globalBucketSSEConfigSys = NewBucketSSEConfigSys()

	// Create new bucket object lock subsystem
	globalBucketObjectLockSys = NewBucketObjectLockSys()

	// Create new bucket quota subsystem
	globalBucketQuotaSys = NewBucketQuotaSys()



func initAllSubsystems(newObject ObjectLayer) (err error) 
	// ...
	
    // Initialize config system.
	if err = globalConfigSys.Init(newObject); err != nil 
		return fmt.Errorf("Unable to initialize config system: %w", err)
	
	
	// ...

2. ConfigSys

minio/cmd/config.go

package cmd

// ConfigSys - config system.
type ConfigSys struct

// NewConfigSys - creates new config system object.
func NewConfigSys() *ConfigSys 
	return &ConfigSys


// Init - initializes config system from config.json.
func (sys *ConfigSys) Init(objAPI ObjectLayer) error 
	if objAPI == nil 
		return errInvalidArgument
	

	return initConfig(objAPI)


// Initialize and load config from remote etcd or local config directory
func initConfig(objAPI ObjectLayer) error 
	if objAPI == nil 
		return errServerNotInitialized
	

	if isFile(getConfigFile()) 
		if err := migrateConfig(); err != nil 
			return err
		
	

	// Migrates $HOME/.minio/config.json or config.json.deprecated
	// to '<export_path>/.minio.sys/config/config.json'
	// ignore if the file doesn't exist.
	// If etcd is set then migrates /config/config.json
	// to '<export_path>/.minio.sys/config/config.json'
	if err := migrateConfigToMiniosys(objAPI); err != nil 
		return err
	

	// Migrates backend '<export_path>/.minio.sys/config/config.json' to latest version.
	if err := migrateMinioSysConfig(objAPI); err != nil 
		return err
	

	// Migrates backend '<export_path>/.minio.sys/config/config.json' to
	// latest config format.
	if err := migrateMinioSysConfigToKV(objAPI); err != nil 
		return err
	

	return loadConfig(objAPI)

3. globalServerConfig

minio/cmd/config-current.go

package cmd

var (
	// globalServerConfig server config.
	globalServerConfig   config.Config
	globalServerConfigMu sync.RWMutex
)

// loadConfig - loads a new config from disk, overrides params
// from env if found and valid
func loadConfig(objAPI ObjectLayer) error 
	srvCfg, err := getValidConfig(objAPI)
	if err != nil 
		return err
	

	// Override any values from ENVs.
	lookupConfigs(srvCfg)

	// hold the mutex lock before a new config is assigned.
	globalServerConfigMu.Lock()
	globalServerConfig = srvCfg
	globalServerConfigMu.Unlock()

	return nil

4. 配置的增删改查

minio/cmd/admin-router.go

package cmd

// adminAPIHandlers provides HTTP handlers for MinIO admin API.
type adminAPIHandlers struct

// registerAdminRouter - Add handler functions for each service REST API routes.
func registerAdminRouter(router *mux.Router, enableConfigOps, enableIAMOps bool) 

	adminAPI := adminAPIHandlers
	// Admin router
	adminRouter := router.PathPrefix(adminPathPrefix).Subrouter()

	/// Service operations

	adminVersions := []string
		adminAPIVersionPrefix,
		adminAPIVersionV2Prefix,
	

	for _, adminVersion := range adminVersions 		
		
		// Config KV operations.
		if enableConfigOps 
			adminRouter.Methods(http.MethodGet).Path(adminVersion+"/get-config-kv").HandlerFunc(httpTraceHdrs(adminAPI.GetConfigKVHandler)).Queries("key", "key:.*")
			adminRouter.Methods(http.MethodPut).Path(adminVersion + "/set-config-kv").HandlerFunc(httpTraceHdrs(adminAPI.SetConfigKVHandler))
			adminRouter.Methods(http.MethodDelete).Path(adminVersion + "/del-config-kv").HandlerFunc(httpTraceHdrs(adminAPI.DelConfigKVHandler))
		
		
	// If none of the routes match add default error handler routes
	adminRouter.NotFoundHandler = http.HandlerFunc(httpTraceAll(errorResponseHandler))
	adminRouter.MethodNotAllowedHandler = http.HandlerFunc(httpTraceAll(errorResponseHandler))
		

minio/cmd/admin-handlers-config-kv.go

// SetConfigKVHandler - PUT /minio/admin/v3/set-config-kv
func (a adminAPIHandlers) SetConfigKVHandler(w http.ResponseWriter, r *http.Request) 
	ctx := newContext(r, w, "SetConfigKV")

	defer logger.AuditLog(w, r, "SetConfigKV", mustGetClaimsFromToken(r))

	cred, objectAPI := validateAdminReqConfigKV(ctx, w, r)
	if objectAPI == nil 
		return
	

	if r.ContentLength > maxEConfigJSONSize || r.ContentLength == -1 
		// More than maxConfigSize bytes were available
		writeErrorResponseJSON(ctx, w, errorCodes.ToAPIErr(ErrAdminConfigTooLarge), r.URL)
		return
	

	password := cred.SecretKey
	kvBytes, err := madmin.DecryptData(password, io.LimitReader(r.Body, r.ContentLength))
	if err != nil 
		logger.LogIf(ctx, err, logger.Application)
		writeErrorResponseJSON(ctx, w, errorCodes.ToAPIErr(ErrAdminConfigBadJSON), r.URL)
		return
	

	cfg, err := readServerConfig(ctx, objectAPI)
	if err != nil 
		writeErrorResponseJSON(ctx, w, toAdminAPIErr(ctx, err), r.URL)
		return
	

	if _, err = cfg.ReadFrom(bytes.NewReader(kvBytes)); err != nil 
		writeErrorResponseJSON(ctx, w, toAdminAPIErr(ctx, err), r.URL)
		return
	

	if err = validateConfig(cfg); err != nil 
		writeCustomErrorResponseJSON(ctx, w, errorCodes.ToAPIErr(ErrAdminConfigBadJSON), err.Error(), r.URL)
		return
	

	// Update the actual server config on disk.
	if err = saveServerConfig(ctx, objectAPI, cfg); err != nil 
		writeErrorResponseJSON(ctx, w, toAdminAPIErr(ctx, err), r.URL)
		return
	

	// Write to the config input KV to history.
	if err = saveServerConfigHistory(ctx, objectAPI, kvBytes); err != nil 
		writeErrorResponseJSON(ctx, w, toAdminAPIErr(ctx, err), r.URL)
		return
	

	// Make sure to write backend is encrypted
	if globalConfigEncrypted 
		saveConfig(GlobalContext, objectAPI, backendEncryptedFile, backendEncryptedMigrationComplete)
	

	writeSuccessResponseHeadersOnly(w)

6. MinIO Bucket 配额

Bucket 配额有两种类型:Hard + FIFO

  • Hard quota disallows writes to the bucket after configured quota limit is reached.
  • FIFO quota automatically deletes oldest content until bucket usage falls within configured limit while permitting writes.

1. PutObject 时进行 bucket 容量校验

func (api objectAPIHandlers) PutObjectHandler(w http.ResponseWriter, r *http.Request) 

	// ...
	
	if err := enforceBucketQuota(ctx, bucket, size); err != nil 
		writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r))
		return
	
	
	// ...

2. BucketQuotaSys 子模块

minio/cmd/bucket-quota.go

func enforceBucketQuota(ctx context.Context, bucket string, size int64) error 
	if size < 0 
		return nil
	

	return globalBucketQuotaSys.check(ctx, bucket, size)

minio/cmd/bucket-quota.go

// BucketQuotaSys - map of bucket and quota configuration.
type BucketQuotaSys struct 
	bucketStorageCache timedValue


// Get - Get quota configuration.
func (sys *BucketQuotaSys) Get(bucketName string) (*madmin.BucketQuota, error) 
	if globalIsGateway 
		objAPI := newObjectLayerFn()
		if objAPI == nil 
			return nil, errServerNotInitialized
		
		return &madmin.BucketQuota, nil
	

	return globalBucketMetadataSys.GetQuotaConfig(bucketName)


// NewBucketQuotaSys returns initialized BucketQuotaSys
func NewBucketQuotaSys() *BucketQuotaSys 
	return &BucketQuotaSys


func (sys *BucketQuotaSys) check(ctx context.Context, bucket string, size int64) error 
	objAPI := newObjectLayerWithoutSafeModeFn()
	if objAPI == nil 
		return errServerNotInitialized
	

	q, err := sys.Get(bucket)
	if err != nil 
		return nil
	

	if q.Type == madmin.FIFOQuota 
		return nil
	

	if q.Quota == 0 
		// No quota set return quickly.
		return nil
	

	sys.bucketStorageCache.Once.Do(func() 
		sys.bucketStorageCache.TTL = 10 * time.Second
		sys.bucketStorageCache.Update = func() (interface, error) 
			return loadDataUsageFromBackend(ctx, objAPI)
		
	)

	v, err := sys.bucketStorageCache.Get()
	if err != nil 
		return err
	

	dui := v.(DataUsageInfo)

	bui, ok := dui.BucketsUsage[bucket]
	if !ok 
		// bucket not found, cannot enforce quota
		// call will fail anyways later.
		return nil
	

	if (bui.Size + uint64(size)) > q.Quota 
		return BucketQuotaExceededBucket: bucket
	

	return nil

3. 后台任务:周期删除object释放空间

const (
	bgQuotaInterval = 1 * time.Hour
)

// initQuotaEnforcement starts the routine that deletes objects in bucket
// that exceeds the FIFO quota
func initQuotaEnforcement(ctx context.Context, objAPI ObjectLayer) 
	if env.Get(envDataUsageCrawlConf, config.EnableOn) == config.EnableOn 
		go startBucketQuotaEnforcement(ctx, objAPI)
	


func startBucketQuotaEnforcement(ctx context.Context, objAPI ObjectLayer) 
	for 
		select 
		case <-ctx.Done():
			return
		case <-time.NewTimer(bgQuotaInterval).C:
			logger.LogIf(ctx, enforceFIFOQuota(ctx, objAPI))
		

	


// enforceFIFOQuota deletes objects in FIFO order until sufficient objects
// have been deleted so as to bring bucket usage within quota
func enforceFIFOQuota(ctx context.Context, objectAPI ObjectLayer) error 
	// Turn off quota enforcement if data usage info is unavailable.
	if env.Get(envDataUsageCrawlConf, config.EnableOn) == config.EnableOff 
		return nil
	

	buckets, err := objectAPI.ListBuckets(ctx)
	if err != nil 
		return err
	

	dataUsageInfo, err := loadDataUsageFromBackend(ctx, objectAPI)
	if err != nil 
		return err
	

	for _, binfo := range buckets 
		bucket := binfo.Name

		bui, ok := dataUsageInfo.BucketsUsage[bucket]
		if !ok 
			// bucket doesn't exist anymore, or we
			// do not have any information to proceed.
			continue
		

		// Check if the current bucket has quota restrictions, if not skip it
		cfg, err := globalBucketQuotaSys.Get(bucket)
		if err != nil 
			continue
		

		if cfg.Type != madmin.FIFOQuota 
			continue
		

		var toFree uint64
		if bui.Size > cfg.Quota && cfg.Quota > 0 
			toFree = bui.Size - cfg.Quota
		

		if toFree == 0 
			continue
		

		// Allocate new results channel to receive ObjectInfo.
		objInfoCh := make(chan ObjectInfo)

		// Walk through all objects
		if err := objectAPI.Walk(ctx, bucket, "", objInfoCh); err != nil 
			return err
		

		// reuse the fileScorer used by disk cache to score entries by
		// ModTime to find the oldest objects in bucket to delete. In
		// the context of bucket quota enforcement - number of hits are
		// irrelevant.
		scorer, err := newFileScorer(toFree, time.Now().Unix(), 1)
		if err != nil 
			return err
		

		rcfg, _ := globalBucketObjectLockSys.Get(bucket)

		for obj := range objInfoCh 
			// skip objects currently under retention
			if rcfg.LockEnabled && enforceRetentionForDeletion(ctx, obj) 
				continue
			
			scorer.addFile(obj.Name, obj.ModTime, obj.Size, 1)
		
		var objects []string
		numKeys := len(scorer.fileNames())
		for i, key := range scorer.fileNames() 
			objects = append(objects, key)
			if len(objects) < maxDeleteList && (i < numKeys-1) 
				// skip deletion until maxObjectList or end of slice
				continue
			

			if len(objects) == 0 
				break
			
			// Deletes a list of objects.
			deleteErrs, err := objectAPI.DeleteObjects(ctx, bucket, objects)
			if err != nil 
				logger.LogIf(ctx, err)
			 else 
				for i := range deleteErrs 
					if deleteErrs[i] != nil 
						logger.LogIf(ctx, deleteErrs[i])
						continue
					
					// Notify object deleted event.
					sendEvent(eventArgs
						EventName:  event.ObjectRemovedDelete,
						BucketName: bucket,
						Object: ObjectInfo
							Name: objects[i],
						,
						Host: "Internal: [FIFO-QUOTA-EXPIRY]",
					)
				
				objects = nil
			
		
	
	return nil

以上是关于MinIO 源码分析的主要内容,如果未能解决你的问题,请参考以下文章

SpringBoot 搭建基于 minio 的高性能存储服务

SpringBoot集成minio使用指南

MinIO的搭建

SpringBoot2 整合MinIO中间件,实现文件便捷管理

Minio架构简介

cetnos7.9安装MinIO