MinIO 源码分析
Posted yvhqbat
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了MinIO 源码分析相关的知识,希望对你有一定的参考价值。
文章目录
- Bucket日志审计概要设计说明书
Bucket日志审计概要设计说明书
参考
1. MinIO 事件通知模块的源码剖析
1. 事件通知模块 NotificationSys
// NotificationSys - notification system.
type NotificationSys struct
sync.RWMutex
targetList *event.TargetList
targetResCh chan event.TargetIDResult
bucketRulesMap map[string]event.RulesMap
bucketRemoteTargetRulesMap map[string]map[event.TargetID]event.RulesMap
peerClients []*peerRESTClient
// Send - sends event data to all matching targets.
func (sys *NotificationSys) Send(args eventArgs)
sys.RLock()
targetIDSet := sys.bucketRulesMap[args.BucketName].Match(args.EventName, args.Object.Name)
sys.RUnlock()
if len(targetIDSet) == 0
return
sys.targetList.Send(args.ToEvent(true), targetIDSet, sys.targetResCh)
2. target.Target
存储桶事件可以发布到的通知目标有:Redis、MySQL、PostgreSQL、Kafka、Elasticsearch 等。
对于多个target,使用 sync.WaitGroup
进行并发发送。
// Target - event target interface
type Target interface
ID() TargetID
IsActive() (bool, error)
Save(Event) error
Send(string) error
Close() error
HasQueueStore() bool
// TargetList - holds list of targets indexed by target ID.
type TargetList struct
sync.RWMutex
targets map[TargetID]Target
// Send - sends events to targets identified by target IDs.
func (list *TargetList) Send(event Event, targetIDset TargetIDSet, resCh chan<- TargetIDResult)
go func()
var wg sync.WaitGroup
for id := range targetIDset
list.RLock()
target, ok := list.targets[id]
list.RUnlock()
if ok
wg.Add(1)
go func(id TargetID, target Target)
defer wg.Done()
tgtRes := TargetIDResultID: id
if err := target.Save(event); err != nil
tgtRes.Err = err
resCh <- tgtRes
(id, target)
else
resCh <- TargetIDResultID: id
wg.Wait()
()
3. target.RedisTarget
通知目标支持两种格式: namespace 和 access。
如果用的是 namespacee 格式,MinIO将存储桶里的对象同步成Redis hash中的条目。对于每一个条目,对应一个存储桶里的对象,其key都被设为"存储桶名称/对象名称",value都是一个有关这个MinIO对象的JSON格式的事件数据。如果对象更新或者删除,hash中对象的条目也会相应的更新或者删除。
如果使用的是 access ,MinIO使用RPUSH将事件添加到list中。这个list中每一个元素都是一个JSON格式的list,这个list中又有两个元素,第一个元素是时间戳的字符串,第二个元素是一个含有在这个存储桶上进行操作的事件数据的JSON对象。在这种格式下,list中的元素不会更新或者删除。
// RedisTarget - Redis target.
type RedisTarget struct
id event.TargetID
args RedisArgs
pool *redis.Pool
store Store
firstPing bool
loggerOnce func(ctx context.Context, err error, id interface, errKind ...interface)
// Save - saves the events to the store if questore is configured, which will be replayed when the redis connection is active.
func (target *RedisTarget) Save(eventData event.Event) error
if target.store != nil
return target.store.Put(eventData)
_, err := target.IsActive()
if err != nil
return err
return target.send(eventData)
// send - sends an event to the redis.
func (target *RedisTarget) send(eventData event.Event) error
conn := target.pool.Get()
defer func()
cErr := conn.Close()
target.loggerOnce(context.Background(), cErr, target.ID())
()
if target.args.Format == event.NamespaceFormat
objectName, err := url.QueryUnescape(eventData.S3.Object.Key)
if err != nil
return err
key := eventData.S3.Bucket.Name + "/" + objectName
if eventData.EventName == event.ObjectRemovedDelete
_, err = conn.Do("HDEL", target.args.Key, key)
else
var data []byte
if data, err = json.Marshal(struct Records []event.Event []event.EventeventData); err != nil
return err
_, err = conn.Do("HSET", target.args.Key, key, data)
if err != nil
return err
if target.args.Format == event.AccessFormat
data, err := json.Marshal([]RedisAccessEventEvent: []event.EventeventData, EventTime: eventData.EventTime)
if err != nil
return err
if _, err := conn.Do("RPUSH", target.args.Key, data); err != nil
return err
return nil
4. target.mysqlTarget
// MySQLTarget - MySQL target.
type MySQLTarget struct
id event.TargetID
args MySQLArgs
updateStmt *sql.Stmt
deleteStmt *sql.Stmt
insertStmt *sql.Stmt
db *sql.DB
store Store
firstPing bool
loggerOnce func(ctx context.Context, err error, id interface, errKind ...interface)
// Save - saves the events to the store which will be replayed when the SQL connection is active.
func (target *MySQLTarget) Save(eventData event.Event) error
if target.store != nil
return target.store.Put(eventData)
_, err := target.IsActive()
if err != nil
return err
return target.send(eventData)
// send - sends an event to the mysql.
func (target *MySQLTarget) send(eventData event.Event) error
if target.args.Format == event.NamespaceFormat
objectName, err := url.QueryUnescape(eventData.S3.Object.Key)
if err != nil
return err
key := eventData.S3.Bucket.Name + "/" + objectName
if eventData.EventName == event.ObjectRemovedDelete
_, err = target.deleteStmt.Exec(key)
else
var data []byte
if data, err = json.Marshal(struct Records []event.Event []event.EventeventData); err != nil
return err
_, err = target.updateStmt.Exec(key, data)
return err
if target.args.Format == event.AccessFormat
eventTime, err := time.Parse(event.AMZTimeFormat, eventData.EventTime)
if err != nil
return err
data, err := json.Marshal(struct Records []event.Event []event.EventeventData)
if err != nil
return err
_, err = target.insertStmt.Exec(eventTime, data)
return err
return nil
5. target.KafkaTarget
// KafkaTarget - Kafka target.
type KafkaTarget struct
id event.TargetID
args KafkaArgs
producer sarama.SyncProducer
config *sarama.Config
store Store
loggerOnce func(ctx context.Context, err error, id interface, errKind ...interface)
// Save - saves the events to the store which will be replayed when the Kafka connection is active.
func (target *KafkaTarget) Save(eventData event.Event) error
if target.store != nil
return target.store.Put(eventData)
_, err := target.IsActive()
if err != nil
return err
return target.send(eventData)
// send - sends an event to the kafka.
func (target *KafkaTarget) send(eventData event.Event) error
objectName, err := url.QueryUnescape(eventData.S3.Object.Key)
if err != nil
return err
key := eventData.S3.Bucket.Name + "/" + objectName
data, err := json.Marshal(event.LogEventName: eventData.EventName, Key: key, Records: []event.EventeventData)
if err != nil
return err
msg := sarama.ProducerMessage
Topic: target.args.Topic,
Key: sarama.StringEncoder(key),
Value: sarama.ByteEncoder(data),
_, _, err = target.producer.SendMessage(&msg)
return err
6. target.Store
// Store - To persist the events.
type Store interface
Put(event event.Event) error
Get(key string) (event.Event, error)
List() ([]string, error)
Del(key string) error
Open() error
// QueueStore - Filestore for persisting events.
type QueueStore struct
sync.RWMutex
currentEntries uint64
entryLimit uint64
directory string
// NewQueueStore - Creates an instance for QueueStore.
func NewQueueStore(directory string, limit uint64) Store
if limit == 0
limit = defaultLimit
_, maxRLimit, err := sys.GetMaxOpenFileLimit()
if err == nil
// Limit the maximum number of entries
// to maximum open file limit
if maxRLimit < limit
limit = maxRLimit
return &QueueStore
directory: directory,
entryLimit: limit,
// write - writes event to the directory.
func (store *QueueStore) write(key string, e event.Event) error
// Marshalls the event.
eventData, err := json.Marshal(e)
if err != nil
return err
path := filepath.Join(store.directory, key+eventExt)
if err := ioutil.WriteFile(path, eventData, os.FileMode(0770)); err != nil
return err
// Increment the event count.
store.currentEntries++
return nil
// Put - puts a event to the store.
func (store *QueueStore) Put(e event.Event) error
store.Lock()
defer store.Unlock()
if store.currentEntries >= store.entryLimit
return errLimitExceeded
key, err := getNewUUID()
if err != nil
return err
return store.write(key, e)
7. 事件类型 event.Name
MinIO 支持的事件类型:
// Name - event type enum.
// Refer http://docs.aws.amazon.com/AmazonS3/latest/dev/NotificationHowTo.html#notification-how-to-event-types-and-destinations
type Name int
// Values of Name
const (
ObjectAccessedAll Name = 1 + iota
ObjectAccessedGet
ObjectAccessedGetRetention
ObjectAccessedGetLegalHold
ObjectAccessedHead
ObjectCreatedAll
ObjectCreatedCompleteMultipartUpload
ObjectCreatedCopy
ObjectCreatedPost
ObjectCreatedPut
ObjectCreatedPutRetention
ObjectCreatedPutLegalHold
ObjectRemovedAll
ObjectRemovedDelete
)
8. 事件 event.Event
MinIO 发送的用于发布事件的通知消息是JSON格式的
// Identity represents access key who caused the event.
type Identity struct
PrincipalID string `json:"principalId"`
// Bucket represents bucket metadata of the event.
type Bucket struct
Name string `json:"name"`
OwnerIdentity Identity `json:"ownerIdentity"`
ARN string `json:"arn"`
// Object represents object metadata of the event.
type Object struct
Key string `json:"key"`
Size int64 `json:"size,omitempty"`
ETag string `json:"eTag,omitempty"`
ContentType string `json:"contentType,omitempty"`
UserMetadata map[string]string `json:"userMetadata,omitempty"`
VersionID string `json:"versionId,omitempty"`
Sequencer string `json:"sequencer"`
// Metadata represents event metadata.
type Metadata struct
SchemaVersion string `json:"s3SchemaVersion"`
ConfigurationID string `json:"configurationId"`
Bucket Bucket `json:"bucket"`
Object Object `json:"object"`
// Source represents client information who triggered the event.
type Source struct
Host string `json:"host"`
Port string `json:"port"`
UserAgent string `json:"userAgent"`
// Event represents event notification information defined in
// http://docs.aws.amazon.com/AmazonS3/latest/dev/notification-content-structure.html.
type Event struct
EventVersion string `json:"eventVersion"`
EventSource string `json:"eventSource"`
AwsRegion string `json:"awsRegion"`
EventTime string `json:"eventTime"`
EventName Name `json:"eventName"`
UserIdentity Identity `json:"userIdentity"`
RequestParameters map[string]string `json:"requestParameters"`
ResponseElements map[string]string `json:"responseElements"`
S3 Metadata `json:"s3"`
Source Source `json:"source"`
9. 事件日志 event.Log
// Log represents event information for some event targets.
type Log struct
EventName Name
Key string
Records []Event
2. MinIO 网关源码剖析
启动 minio gateway:
[root@node8217 install]# cat start_minio.sh
#!/bin/bash
export MINIO_ACCESS_KEY=ak_123456
export MINIO_SECRET_KEY=sk_123456
#export MINIO_PROMETHEUS_AUTH_TYPE="public"
#export MINIO_REGION_NAME="wuhan"
#./minio server --address :9009 /BigData/minio &
./minio gateway s3 http://127.0.0.1:9009 &
启动后,可以向使用 minio server 一样使用 minio gateway。
1. minio 启动入口
mino/main.go
package main // import "github.com/minio/minio"
import (
"os"
minio "github.com/minio/minio/cmd"
// Import gateway
_ "github.com/minio/minio/cmd/gateway"
)
func main()
minio.Main(os.Args)
这里引入包 _ "github.com/minio/minio/cmd/gateway"
,会调用github.com/minio/minio/cmd/gateway
包的所支持网关的init
函数,初始化支持的网关。
minio/cmd/main.go
func newApp(name string) *cli.App
// Collection of minio commands currently supported are.
commands := []cli.Command
// registerCommand registers a cli command.
registerCommand := func(command cli.Command)
commands = append(commands, command)
// Register all commands.
registerCommand(serverCmd)
registerCommand(gatewayCmd)
app := cli.NewApp()
app.Name = name
app.Commands = commands
return app
// Main main for minio server.
func Main(args []string)
// Set the minio app name.
appName := filepath.Base(args[0])
// Run the app - exit on error.
if err := newApp(appName).Run(args); err != nil
os.Exit(1)
注册了两个命令,全局变量 serverCmd 和 gatewayCmd
2. 注册S3网关
gatewayCmd
minio/cmd/gateway-main.go
var (
gatewayCmd = cli.Command
Name: "gateway",
Usage: "start object storage gateway",
Flags: append(ServerFlags, GlobalFlags...),
HideHelpCommand: true,
)
// RegisterGatewayCommand registers a new command for gateway.
func RegisterGatewayCommand(cmd cli.Command) error
cmd.Flags = append(append(cmd.Flags, ServerFlags...), GlobalFlags...)
gatewayCmd.Subcommands = append(gatewayCmd.Subcommands, cmd)
return nil
注册 S3 gateway
minio/cmd/gateway/s3/gateway-s3.go
const (
s3Backend = "s3"
)
func init()
minio.RegisterGatewayCommand(cli.Command
Name: s3Backend,
Usage: "Amazon Simple Storage Service (S3)",
Action: s3GatewayMain,
HideHelpCommand: true,
)
// Handler for 'minio gateway s3' command line.
func s3GatewayMain(ctx *cli.Context)
args := ctx.Args()
if !ctx.Args().Present()
args = cli.Args"https://s3.amazonaws.com"
serverAddr := ctx.GlobalString("address")
if serverAddr == "" || serverAddr == ":"+minio.GlobalMinioDefaultPort
serverAddr = ctx.String("address")
// Validate gateway arguments.
logger.FatalIf(minio.ValidateGatewayArguments(serverAddr, args.First()), "Invalid argument")
// Start the gateway..
minio.StartGateway(ctx, &S3args.First())
重点在于 minio.StartGateway(ctx, &S3args.First())
,根据参数启动相应的网关。
3. S3实现Gateway接口
minio/cmd/gateway-main.go
// StartGateway - handler for 'minio gateway <name>'.
func StartGateway(ctx *cli.Context, gw Gateway)
// 网关启动 ......
可见,s3.S3 实现了 cmd.Gateway 接口,
// Gateway represents a gateway backend.
type Gateway interface
// Name returns the unique name of the gateway.
Name() string
// NewGatewayLayer returns a new ObjectLayer.
NewGatewayLayer(creds auth.Credentials) (ObjectLayer, error)
// Returns true if gateway is ready for production.
Production() bool
S3网关实现Gateway接口
minio/cmd/gateway/s3/gateway-s3.go
// S3 implements Gateway.
type S3 struct
host string
// Name implements Gateway interface.
func (g *S3) Name() string
return s3Backend
// NewGatewayLayer returns s3 ObjectLayer.
func (g *S3) NewGatewayLayer(creds auth.Credentials) (minio.ObjectLayer, error)
// creds are ignored here, since S3 gateway implements chaining
// all credentials.
clnt, err := newS3(g.host)
if err != nil
return nil, err
metrics := minio.NewMetrics()
t := &minio.MetricsTransport
Transport: minio.NewGatewayHTTPTransport(),
Metrics: metrics,
// Set custom transport
clnt.SetCustomTransport(t)
probeBucketName := randString(60, rand.NewSource(time.Now().UnixNano()), "probe-bucket-sign-")
// Check if the provided keys are valid.
if _, err = clnt.BucketExists(probeBucketName); err != nil
if miniogo.ToErrorResponse(err).Code != "AccessDenied"
return nil, err
s := s3Objects
Client: clnt,
Metrics: metrics,
HTTPClient: &http.Client
Transport: t,
,
// Enables single encryption of KMS is configured.
if minio.GlobalKMS != nil
encS := s3EncObjectss
// Start stale enc multipart uploads cleanup routine.
go encS.cleanupStaleEncMultipartUploads(minio.GlobalContext,
minio.GlobalMultipartCleanupInterval, minio.GlobalMultipartExpiry)
return &encS, nil
return &s, nil
// Production - s3 gateway is production ready.
func (g *S3) Production() bool
return true
4. 网关启动
minio/cmd/gateway-main.go
// StartGateway - handler for 'minio gateway <name>'.
func StartGateway(ctx *cli.Context, gw Gateway)
if gw == nil
logger.FatalIf(errUnexpected, "Gateway implementation not initialized")
// Handle common command args.
handleCommonCmdArgs(ctx)
// Handle gateway specific env
gatewayHandleEnvVars()
// Set when gateway is enabled
globalIsGateway = true
router := mux.NewRouter().SkipClean(true).UseEncodedPath()
// Enable IAM admin APIs if etcd is enabled, if not just enable basic
// operations such as profiling, server info etc.
registerAdminRouter(router, enableConfigOps, enableIAMOps)
// Add healthcheck router
registerHealthCheckRouter(router)
// Add server metrics router
registerMetricsRouter(router)
// Register web router when its enabled.
if globalBrowserEnabled
logger.FatalIf(registerWebRouter(router), "Unable to configure web browser")
// Currently only NAS and S3 gateway support encryption headers.
encryptionEnabled := gatewayName == "s3" || gatewayName == "nas"
allowSSEKMS := gatewayName == "s3" // Only S3 can support SSE-KMS (as pass-through)
// Add API router.
registerAPIRouter(router, encryptionEnabled, allowSSEKMS)
httpServer := xhttp.NewServer([]stringglobalCLIContext.Addr,
criticalErrorHandlerregisterHandlers(router, globalHandlers...), getCert)
httpServer.BaseContext = func(listener net.Listener) context.Context
return GlobalContext
go func()
globalHTTPServerErrorCh <- httpServer.Start()
()
globalObjLayerMutex.Lock()
globalHTTPServer = httpServer
globalObjLayerMutex.Unlock()
newObject, err := gw.NewGatewayLayer(globalActiveCred)
newObject = NewGatewayLayerWithLocker(newObject)
// Once endpoints are finalized, initialize the new object api in safe mode.
globalObjLayerMutex.Lock()
globalObjectAPI = newObject
globalObjLayerMutex.Unlock()
// Calls all New() for all sub-systems.
newAllSubsystems()
if enableIAMOps
// Initialize IAM sys.
logger.FatalIf(globalIAMSys.Init(GlobalContext, newObject), "Unable to initialize IAM system")
if globalCacheConfig.Enabled
// initialize the new disk cache objects.
var cacheAPI CacheObjectLayer
cacheAPI, err = newServerCacheObjects(GlobalContext, globalCacheConfig)
logger.FatalIf(err, "Unable to initialize disk caching")
globalObjLayerMutex.Lock()
globalCacheObjectAPI = cacheAPI
globalObjLayerMutex.Unlock()
3. MinIO 网关磁盘缓存源码剖析
1. disk cache 使用
#!/bin/bash
export MINIO_ACCESS_KEY=ak_123456
export MINIO_SECRET_KEY=sk_123456
#export MINIO_PROMETHEUS_AUTH_TYPE="public"
#export MINIO_REGION_NAME="wuhan"
#./minio server --address :9009 /BigData/minio &
export MINIO_CACHE="on"
export MINIO_CACHE_DRIVES="/mnt/drive1,/mnt/drive2"
export MINIO_CACHE_EXCLUDE="*.pdf,mybucket/*"
export MINIO_CACHE_QUOTA=80
export MINIO_CACHE_AFTER=3
export MINIO_CACHE_WATERMARK_LOW=70
export MINIO_CACHE_WATERMARK_HIGH=90
./minio gateway s3 http://127.0.0.1:9009 &
2. disk cache 实现
func StartGateway(ctx *cli.Context, gw Gateway)
// ......
if globalCacheConfig.Enabled
// initialize the new disk cache objects.
var cacheAPI CacheObjectLayer
cacheAPI, err = newServerCacheObjects(GlobalContext, globalCacheConfig)
logger.FatalIf(err, "Unable to initialize disk caching")
globalObjLayerMutex.Lock()
globalCacheObjectAPI = cacheAPI
globalObjLayerMutex.Unlock()
// ......
minio/cmd/disk-cache.go
// CacheObjectLayer implements primitives for cache object API layer.
type CacheObjectLayer interface
// Object operations.
GetObjectNInfo(ctx context.Context, bucket, object string, rs *HTTPRangeSpec, h http.Header, lockType LockType, opts ObjectOptions) (gr *GetObjectReader, err error)
GetObjectInfo(ctx context.Context, bucket, object string, opts ObjectOptions) (objInfo ObjectInfo, err error)
DeleteObject(ctx context.Context, bucket, object string) error
DeleteObjects(ctx context.Context, bucket string, objects []string) ([]error, error)
PutObject(ctx context.Context, bucket, object string, data *PutObjReader, opts ObjectOptions) (objInfo ObjectInfo, err error)
CopyObject(ctx context.Context, srcBucket, srcObject, destBucket, destObject string, srcInfo ObjectInfo, srcOpts, dstOpts ObjectOptions) (objInfo ObjectInfo, err error)
// Storage operations.
StorageInfo(ctx context.Context) CacheStorageInfo
CacheStats() *CacheStats
cacheObjects
结构体实现了 CacheObjectLayer
接口:
// Returns cacheObjects for use by Server.
func newServerCacheObjects(ctx context.Context, config cache.Config) (CacheObjectLayer, error)
// list of disk caches for cache "drives" specified in config.json or MINIO_CACHE_DRIVES env var.
cache, migrateSw, err := newCache(config)
if err != nil
return nil, err
c := &cacheObjects
cache: cache,
exclude: config.Exclude,
after: config.After,
migrating: migrateSw,
migMutex: sync.Mutex,
cacheStats: newCacheStats(),
GetObjectInfoFn: func(ctx context.Context, bucket, object string, opts ObjectOptions) (ObjectInfo, error)
return newObjectLayerFn().GetObjectInfo(ctx, bucket, object, opts)
,
GetObjectNInfoFn: func(ctx context.Context, bucket, object string, rs *HTTPRangeSpec, h http.Header, lockType LockType, opts ObjectOptions) (gr *GetObjectReader, err error)
return newObjectLayerFn().GetObjectNInfo(ctx, bucket, object, rs, h, lockType, opts)
,
DeleteObjectFn: func(ctx context.Context, bucket, object string) error
return newObjectLayerFn().DeleteObject(ctx, bucket, object)
,
DeleteObjectsFn: func(ctx context.Context, bucket string, objects []string) ([]error, error)
errs := make([]error, len(objects))
for idx, object := range objects
errs[idx] = newObjectLayerFn().DeleteObject(ctx, bucket, object)
return errs, nil
,
PutObjectFn: func(ctx context.Context, bucket, object string, data *PutObjReader, opts ObjectOptions) (objInfo ObjectInfo, err error)
return newObjectLayerFn().PutObject(ctx, bucket, object, data, opts)
,
CopyObjectFn: func(ctx context.Context, srcBucket, srcObject, destBucket, destObject string, srcInfo ObjectInfo, srcOpts, dstOpts ObjectOptions) (objInfo ObjectInfo, err error)
return newObjectLayerFn().CopyObject(ctx, srcBucket, srcObject, destBucket, destObject, srcInfo, srcOpts, dstOpts)
,
if migrateSw
go c.migrateCacheFromV1toV2(ctx)
go c.gc(ctx)
return c, nil
// Abstracts disk caching - used by the S3 layer
type cacheObjects struct
// slice of cache drives
cache []*diskCache
// file path patterns to exclude from cache
exclude []string
// number of accesses after which to cache an object
after int
// if true migration is in progress from v1 to v2
migrating bool
// mutex to protect migration bool
migMutex sync.Mutex
// Cache stats
cacheStats *CacheStats
GetObjectNInfoFn func(ctx context.Context, bucket, object string, rs *HTTPRangeSpec, h http.Header, lockType LockType, opts ObjectOptions) (gr *GetObjectReader, err error)
GetObjectInfoFn func(ctx context.Context, bucket, object string, opts ObjectOptions) (objInfo ObjectInfo, err error)
DeleteObjectFn func(ctx context.Context, bucket, object string) error
DeleteObjectsFn func(ctx context.Context, bucket string, objects []string) ([]error, error)
PutObjectFn func(ctx context.Context, bucket, object string, data *PutObjReader, opts ObjectOptions) (objInfo ObjectInfo, err error)
CopyObjectFn func(ctx context.Context, srcBucket, srcObject, destBucket, destObject string, srcInfo ObjectInfo, srcOpts, dstOpts ObjectOptions) (objInfo ObjectInfo, err error)
真正缓存层是 []*diskCache
实现:
// newCache initializes the cacheFSObjects for the "drives" specified in config.json
// or the global env overrides.
func newCache(config cache.Config) ([]*diskCache, bool, error)
var caches []*diskCache
ctx := logger.SetReqInfo(GlobalContext, &logger.ReqInfo)
formats, migrating, err := loadAndValidateCacheFormat(ctx, config.Drives)
if err != nil
return nil, false, err
for i, dir := range config.Drives
// skip diskCache creation for cache drives missing a format.json
if formats[i] == nil
caches = append(caches, nil)
continue
if err := checkAtimeSupport(dir); err != nil
return nil, false, errors.New("Atime support required for disk caching")
quota := config.MaxUse
if quota == 0
quota = config.Quota
cache, err := newDiskCache(dir, quota, config.After, config.WatermarkLow, config.WatermarkHigh)
if err != nil
return nil, false, err
caches = append(caches, cache)
return caches, migrating, nil
4. MinIO IAM 模块
1. 用户鉴权
S3协议的每个操作,都需要IAMSys进行权限校验,主要代码如下:
if globalIAMSys.IsAllowed(iampolicy.Args
AccountName: cred.AccessKey,
Action: iampolicy.Action(action),
BucketName: bucketName,
ConditionValues: getConditionValues(r, "", cred.AccessKey, claims),
ObjectName: objectName,
IsOwner: owner,
Claims: claims,
)
// Request is allowed return the appropriate access key.
return cred.AccessKey, owner, ErrNone
return cred.AccessKey, owner, ErrAccessDenied
2. globalIAMSys
minio/cmd/globals.go
var (
globalIAMSys *IAMSys
)
cmd/server-main.go
func newAllSubsystems()
// Create new notification system and initialize notification targets
globalNotificationSys = NewNotificationSys(globalEndpoints)
// Create new bucket metadata system.
globalBucketMetadataSys = NewBucketMetadataSys()
// Create a new config system.
globalConfigSys = NewConfigSys()
// Create new IAM system.
globalIAMSys = NewIAMSys()
// Create new policy system.
globalPolicySys = NewPolicySys()
// Create new lifecycle system.
globalLifecycleSys = NewLifecycleSys()
// Create new bucket object lock subsystem
globalBucketObjectLockSys = NewBucketObjectLockSys()
// Create new bucket quota subsystem
globalBucketQuotaSys = NewBucketQuotaSys()
minio/cmd/iam.go
package cmd
// IAMSys - config system.
type IAMSys struct
usersSysType UsersSysType
// map of policy names to policy definitions
iamPolicyDocsMap map[string]iampolicy.Policy
// map of usernames to credentials
iamUsersMap map[string]auth.Credentials
// map of group names to group info
iamGroupsMap map[string]GroupInfo
// map of user names to groups they are a member of
iamUserGroupMemberships map[string]set.StringSet
// map of usernames/temporary access keys to policy names
iamUserPolicyMap map[string]MappedPolicy
// map of group names to policy names
iamGroupPolicyMap map[string]MappedPolicy
// Persistence layer for IAM subsystem
store IAMStorageAPI
// NewIAMSys - creates new config system object.
func NewIAMSys() *IAMSys
return &IAMSys
usersSysType: MinIOUsersSysType,
iamUsersMap: make(map[string]auth.Credentials),
iamPolicyDocsMap: make(map[string]iampolicy.Policy),
iamUserPolicyMap: make(map[string]MappedPolicy),
iamGroupsMap: make(map[string]GroupInfo),
iamUserGroupMemberships: make(map[string]set.StringSet),
-
iamUsersMap map[string]auth.Credentials
保存了用户信息; -
store IAMStorageAPI
是IAM系统的持久化存储层,可以是:etcd/minio;
3. globalIAMSys
初始化
minio/cmd/iam.go
// Init - initializes config system from iam.json
func (sys *IAMSys) Init(ctx context.Context, objAPI ObjectLayer) error
if globalEtcdClient == nil
sys.store = newIAMObjectStore(ctx, objAPI)
else
sys.store = newIAMEtcdStore(ctx)
// Migrate IAM configuration
if err := sys.doIAMConfigMigration(ctx); err != nil
return err
err := sys.store.loadAll(ctx, sys)
go sys.store.watch(ctx, sys)
return err
如果 globalEtcdClient
不为空,优先以 etcd
作为存储层。
sys.store.loadAll(ctx, sys)
加载所有 IAM 数据;
go sys.store.watch(ctx, sys)
后台监控所有更新;
4. IAMEtcdStore
实现 IAMStorageAPI
- 初始化时,加载etcd中的所有数据;
- 后台watch,及时reload;
minio/cmd/iam-etcd-store.go
// IAMEtcdStore implements IAMStorageAPI
type IAMEtcdStore struct
sync.RWMutex
ctx context.Context
client *etcd.Client
func newIAMEtcdStore(ctx context.Context) *IAMEtcdStore
return &IAMEtcdStoreclient: globalEtcdClient, ctx: ctx
func (ies *IAMEtcdStore) loadAll(ctx context.Context, sys *IAMSys) error
加载所有IAM数据到内存:
func (ies *IAMEtcdStore) loadAll(ctx context.Context, sys *IAMSys) error
iamUsersMap := make(map[string]auth.Credentials)
iamGroupsMap := make(map[string]GroupInfo)
iamPolicyDocsMap := make(map[string]iampolicy.Policy)
iamUserPolicyMap := make(map[string]MappedPolicy)
iamGroupPolicyMap := make(map[string]MappedPolicy)
isMinIOUsersSys := false
ies.rlock()
if sys.usersSysType == MinIOUsersSysType
isMinIOUsersSys = true
ies.runlock()
if err := ies.loadPolicyDocs(ctx, iamPolicyDocsMap); err != nil
return err
// load STS temp users
if err := ies.loadUsers(ctx, stsUser, iamUsersMap); err != nil
return err
if isMinIOUsersSys
// load long term users
if err := ies.loadUsers(ctx, regularUser, iamUsersMap); err != nil
return err
if err := ies.loadUsers(ctx, srvAccUser, iamUsersMap); err != nil
return err
if err := ies.loadGroups(ctx, iamGroupsMap); err != nil
return err
if err := ies.loadMappedPolicies(ctx, regularUser, false, iamUserPolicyMap); err != nil
return err
// load STS policy mappings into the same map
if err := ies.loadMappedPolicies(ctx, stsUser, false, iamUserPolicyMap); err != nil
return err
// load policies mapped to groups
if err := ies.loadMappedPolicies(ctx, regularUser, true, iamGroupPolicyMap); err != nil
return err
ies.lock()
defer ies.Unlock()
// Merge the new reloaded entries into global map.
// See issue https://github.com/minio/minio/issues/9651
// where the present list of entries on disk are not yet
// latest, there is a small window where this can make
// valid users invalid.
for k, v := range iamUsersMap
sys.iamUsersMap[k] = v
for k, v := range iamPolicyDocsMap
sys.iamPolicyDocsMap[k] = v
// Sets default canned policies, if none are set.
setDefaultCannedPolicies(sys.iamPolicyDocsMap)
for k, v := range iamUserPolicyMap
sys.iamUserPolicyMap[k] = v
// purge any expired entries which became expired now.
for k, v := range sys.iamUsersMap
if v.IsExpired()
delete(sys.iamUsersMap, k)
delete(sys.iamUserPolicyMap, k)
// Deleting on the etcd is taken care of in the next cycle
for k, v := range iamGroupPolicyMap
sys.iamGroupPolicyMap[k] = v
for k, v := range iamGroupsMap
sys.iamGroupsMap[k] = v
sys.buildUserGroupMemberships()
return nil
func (ies *IAMEtcdStore) watch(ctx context.Context, sys *IAMSys)
后台监控数据更新:
func (ies *IAMEtcdStore) watch(ctx context.Context, sys *IAMSys)
for
outerLoop:
// Refresh IAMSys with etcd watch.
watchCh := ies.client.Watch(ctx,
iamConfigPrefix, etcd.WithPrefix(), etcd.WithKeysOnly())
for
select
case <-ctx.Done():
return
case watchResp, ok := <-watchCh:
if !ok
time.Sleep(1 * time.Second)
// Upon an error on watch channel
// re-init the watch channel.
goto outerLoop
if err := watchResp.Err(); err != nil
logger.LogIf(ctx, err)
// log and retry.
time.Sleep(1 * time.Second)
// Upon an error on watch channel
// re-init the watch channel.
goto outerLoop
for _, event := range watchResp.Events
ies.lock()
ies.reloadFromEvent(sys, event)
ies.unlock()
// sys.RLock is held by caller.
func (ies *IAMEtcdStore) reloadFromEvent(sys *IAMSys, event *etcd.Event)
eventCreate := event.IsModify() || event.IsCreate()
eventDelete := event.Type == etcd.EventTypeDelete
usersPrefix := strings.HasPrefix(string(event.Kv.Key), iamConfigUsersPrefix)
groupsPrefix := strings.HasPrefix(string(event.Kv.Key), iamConfigGroupsPrefix)
stsPrefix := strings.HasPrefix(string(event.Kv.Key), iamConfigSTSPrefix)
policyPrefix := strings.HasPrefix(string(event.Kv.Key), iamConfigPoliciesPrefix)
policyDBUsersPrefix := strings.HasPrefix(string(event.Kv.Key), iamConfigPolicyDBUsersPrefix)
policyDBSTSUsersPrefix := strings.HasPrefix(string(event.Kv.Key), iamConfigPolicyDBSTSUsersPrefix)
policyDBGroupsPrefix := strings.HasPrefix(string(event.Kv.Key), iamConfigPolicyDBGroupsPrefix)
switch
case eventCreate:
switch
case usersPrefix:
accessKey := path.Dir(strings.TrimPrefix(string(event.Kv.Key),
iamConfigUsersPrefix))
ies.loadUser(accessKey, regularUser, sys.iamUsersMap)
case stsPrefix:
accessKey := path.Dir(strings.TrimPrefix(string(event.Kv.Key),
iamConfigSTSPrefix))
ies.loadUser(accessKey, stsUser, sys.iamUsersMap)
case groupsPrefix:
group := path.Dir(strings.TrimPrefix(string(event.Kv.Key),
iamConfigGroupsPrefix))
ies.loadGroup(group, sys.iamGroupsMap)
gi := sys.iamGroupsMap[group]
sys.removeGroupFromMembershipsMap(group)
sys.updateGroupMembershipsMap(group, &gi)
case policyPrefix:
policyName := path.Dir(strings.TrimPrefix(string(event.Kv.Key),
iamConfigPoliciesPrefix))
ies.loadPolicyDoc(policyName, sys.iamPolicyDocsMap)
case policyDBUsersPrefix:
policyMapFile := strings.TrimPrefix(string(event.Kv.Key),
iamConfigPolicyDBUsersPrefix)
user := strings.TrimSuffix(policyMapFile, ".json")
ies.loadMappedPolicy(user, regularUser, false, sys.iamUserPolicyMap)
case policyDBSTSUsersPrefix:
policyMapFile := strings.TrimPrefix(string(event.Kv.Key),
iamConfigPolicyDBSTSUsersPrefix)
user := strings.TrimSuffix(policyMapFile, ".json")
ies.loadMappedPolicy(user, stsUser, false, sys.iamUserPolicyMap)
case policyDBGroupsPrefix:
policyMapFile := strings.TrimPrefix(string(event.Kv.Key),
iamConfigPolicyDBGroupsPrefix)
user := strings.TrimSuffix(policyMapFile, ".json")
ies.loadMappedPolicy(user, regularUser, true, sys.iamGroupPolicyMap)
case eventDelete:
switch
case usersPrefix:
accessKey := path.Dir(strings.TrimPrefix(string(event.Kv.Key),
iamConfigUsersPrefix))
delete(sys.iamUsersMap, accessKey)
case stsPrefix:
accessKey := path.Dir(strings.TrimPrefix(string(event.Kv.Key),
iamConfigSTSPrefix))
delete(sys.iamUsersMap, accessKey)
case groupsPrefix:
group := path.Dir(strings.TrimPrefix(string(event.Kv.Key),
iamConfigGroupsPrefix))
sys.removeGroupFromMembershipsMap(group)
delete(sys.iamGroupsMap, group)
delete(sys.iamGroupPolicyMap, group)
case policyPrefix:
policyName := path.Dir(strings.TrimPrefix(string(event.Kv.Key),
iamConfigPoliciesPrefix))
delete(sys.iamPolicyDocsMap, policyName)
case policyDBUsersPrefix:
policyMapFile := strings.TrimPrefix(string(event.Kv.Key),
iamConfigPolicyDBUsersPrefix)
user := strings.TrimSuffix(policyMapFile, ".json")
delete(sys.iamUserPolicyMap, user)
case policyDBSTSUsersPrefix:
policyMapFile := strings.TrimPrefix(string(event.Kv.Key),
iamConfigPolicyDBSTSUsersPrefix)
user := strings.TrimSuffix(policyMapFile, ".json")
delete(sys.iamUserPolicyMap, user)
case policyDBGroupsPrefix:
policyMapFile := strings.TrimPrefix(string(event.Kv.Key),
iamConfigPolicyDBGroupsPrefix)
user := strings.TrimSuffix(policyMapFile, ".json")
delete(sys.iamGroupPolicyMap, user)
用户信息的增删改查
func (ies *IAMEtcdStore) saveUserIdentity(name string, userType IAMUserType, u UserIdentity) error
return ies.saveIAMConfig(u, getUserIdentityPath(name, userType))
func (ies *IAMEtcdStore) saveIAMConfig(item interface, path string) error
data, err := json.Marshal(item)
if err != nil
return err
if globalConfigEncrypted
data, err = madmin.EncryptData(globalActiveCred.String(), data)
if err != nil
return err
return saveKeyEtcd(ies.ctx, ies.client, path, data)
func (ies *IAMEtcdStore) deleteUserIdentity(name string, userType IAMUserType) error
err := ies.deleteIAMConfig(getUserIdentityPath(name, userType))
if err == errConfigNotFound
err = errNoSuchUser
return err
func (ies *IAMEtcdStore) deleteIAMConfig(path string) error
return deleteKeyEtcd(ies.ctx, ies.client, path)
5. MinIO配置管理源码剖析
参考:
配置文件:$HOME/.minio/config.json
,在 RELEASE.2018-08-18T03-49-57Z
版本之后,配置文件保存在后端存储中。
1. 全局变量 globalConfigSys
minio/cmd/server-main.go
func serverMain(ctx *cli.Context)
// ...
// 初始化所有子系统
newAllSubsystems()
// ...
func newAllSubsystems()
// Create new notification system and initialize notification targets
globalNotificationSys = NewNotificationSys(globalEndpoints)
// Create new bucket metadata system.
globalBucketMetadataSys = NewBucketMetadataSys()
// Create a new config system.
globalConfigSys = NewConfigSys()
// Create new IAM system.
globalIAMSys = NewIAMSys()
// Create new policy system.
globalPolicySys = NewPolicySys()
// Create new lifecycle system.
globalLifecycleSys = NewLifecycleSys()
// Create new bucket encryption subsystem
globalBucketSSEConfigSys = NewBucketSSEConfigSys()
// Create new bucket object lock subsystem
globalBucketObjectLockSys = NewBucketObjectLockSys()
// Create new bucket quota subsystem
globalBucketQuotaSys = NewBucketQuotaSys()
func initAllSubsystems(newObject ObjectLayer) (err error)
// ...
// Initialize config system.
if err = globalConfigSys.Init(newObject); err != nil
return fmt.Errorf("Unable to initialize config system: %w", err)
// ...
2. ConfigSys
minio/cmd/config.go
package cmd
// ConfigSys - config system.
type ConfigSys struct
// NewConfigSys - creates new config system object.
func NewConfigSys() *ConfigSys
return &ConfigSys
// Init - initializes config system from config.json.
func (sys *ConfigSys) Init(objAPI ObjectLayer) error
if objAPI == nil
return errInvalidArgument
return initConfig(objAPI)
// Initialize and load config from remote etcd or local config directory
func initConfig(objAPI ObjectLayer) error
if objAPI == nil
return errServerNotInitialized
if isFile(getConfigFile())
if err := migrateConfig(); err != nil
return err
// Migrates $HOME/.minio/config.json or config.json.deprecated
// to '<export_path>/.minio.sys/config/config.json'
// ignore if the file doesn't exist.
// If etcd is set then migrates /config/config.json
// to '<export_path>/.minio.sys/config/config.json'
if err := migrateConfigToMiniosys(objAPI); err != nil
return err
// Migrates backend '<export_path>/.minio.sys/config/config.json' to latest version.
if err := migrateMinioSysConfig(objAPI); err != nil
return err
// Migrates backend '<export_path>/.minio.sys/config/config.json' to
// latest config format.
if err := migrateMinioSysConfigToKV(objAPI); err != nil
return err
return loadConfig(objAPI)
3. globalServerConfig
minio/cmd/config-current.go
package cmd
var (
// globalServerConfig server config.
globalServerConfig config.Config
globalServerConfigMu sync.RWMutex
)
// loadConfig - loads a new config from disk, overrides params
// from env if found and valid
func loadConfig(objAPI ObjectLayer) error
srvCfg, err := getValidConfig(objAPI)
if err != nil
return err
// Override any values from ENVs.
lookupConfigs(srvCfg)
// hold the mutex lock before a new config is assigned.
globalServerConfigMu.Lock()
globalServerConfig = srvCfg
globalServerConfigMu.Unlock()
return nil
4. 配置的增删改查
minio/cmd/admin-router.go
package cmd
// adminAPIHandlers provides HTTP handlers for MinIO admin API.
type adminAPIHandlers struct
// registerAdminRouter - Add handler functions for each service REST API routes.
func registerAdminRouter(router *mux.Router, enableConfigOps, enableIAMOps bool)
adminAPI := adminAPIHandlers
// Admin router
adminRouter := router.PathPrefix(adminPathPrefix).Subrouter()
/// Service operations
adminVersions := []string
adminAPIVersionPrefix,
adminAPIVersionV2Prefix,
for _, adminVersion := range adminVersions
// Config KV operations.
if enableConfigOps
adminRouter.Methods(http.MethodGet).Path(adminVersion+"/get-config-kv").HandlerFunc(httpTraceHdrs(adminAPI.GetConfigKVHandler)).Queries("key", "key:.*")
adminRouter.Methods(http.MethodPut).Path(adminVersion + "/set-config-kv").HandlerFunc(httpTraceHdrs(adminAPI.SetConfigKVHandler))
adminRouter.Methods(http.MethodDelete).Path(adminVersion + "/del-config-kv").HandlerFunc(httpTraceHdrs(adminAPI.DelConfigKVHandler))
// If none of the routes match add default error handler routes
adminRouter.NotFoundHandler = http.HandlerFunc(httpTraceAll(errorResponseHandler))
adminRouter.MethodNotAllowedHandler = http.HandlerFunc(httpTraceAll(errorResponseHandler))
minio/cmd/admin-handlers-config-kv.go
// SetConfigKVHandler - PUT /minio/admin/v3/set-config-kv
func (a adminAPIHandlers) SetConfigKVHandler(w http.ResponseWriter, r *http.Request)
ctx := newContext(r, w, "SetConfigKV")
defer logger.AuditLog(w, r, "SetConfigKV", mustGetClaimsFromToken(r))
cred, objectAPI := validateAdminReqConfigKV(ctx, w, r)
if objectAPI == nil
return
if r.ContentLength > maxEConfigJSONSize || r.ContentLength == -1
// More than maxConfigSize bytes were available
writeErrorResponseJSON(ctx, w, errorCodes.ToAPIErr(ErrAdminConfigTooLarge), r.URL)
return
password := cred.SecretKey
kvBytes, err := madmin.DecryptData(password, io.LimitReader(r.Body, r.ContentLength))
if err != nil
logger.LogIf(ctx, err, logger.Application)
writeErrorResponseJSON(ctx, w, errorCodes.ToAPIErr(ErrAdminConfigBadJSON), r.URL)
return
cfg, err := readServerConfig(ctx, objectAPI)
if err != nil
writeErrorResponseJSON(ctx, w, toAdminAPIErr(ctx, err), r.URL)
return
if _, err = cfg.ReadFrom(bytes.NewReader(kvBytes)); err != nil
writeErrorResponseJSON(ctx, w, toAdminAPIErr(ctx, err), r.URL)
return
if err = validateConfig(cfg); err != nil
writeCustomErrorResponseJSON(ctx, w, errorCodes.ToAPIErr(ErrAdminConfigBadJSON), err.Error(), r.URL)
return
// Update the actual server config on disk.
if err = saveServerConfig(ctx, objectAPI, cfg); err != nil
writeErrorResponseJSON(ctx, w, toAdminAPIErr(ctx, err), r.URL)
return
// Write to the config input KV to history.
if err = saveServerConfigHistory(ctx, objectAPI, kvBytes); err != nil
writeErrorResponseJSON(ctx, w, toAdminAPIErr(ctx, err), r.URL)
return
// Make sure to write backend is encrypted
if globalConfigEncrypted
saveConfig(GlobalContext, objectAPI, backendEncryptedFile, backendEncryptedMigrationComplete)
writeSuccessResponseHeadersOnly(w)
6. MinIO Bucket 配额
Bucket 配额有两种类型:Hard + FIFO
Hard
quota disallows writes to the bucket after configured quota limit is reached.FIFO
quota automatically deletes oldest content until bucket usage falls within configured limit while permitting writes.
1. PutObject 时进行 bucket 容量校验
func (api objectAPIHandlers) PutObjectHandler(w http.ResponseWriter, r *http.Request)
// ...
if err := enforceBucketQuota(ctx, bucket, size); err != nil
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r))
return
// ...
2. BucketQuotaSys
子模块
minio/cmd/bucket-quota.go
func enforceBucketQuota(ctx context.Context, bucket string, size int64) error
if size < 0
return nil
return globalBucketQuotaSys.check(ctx, bucket, size)
minio/cmd/bucket-quota.go
// BucketQuotaSys - map of bucket and quota configuration.
type BucketQuotaSys struct
bucketStorageCache timedValue
// Get - Get quota configuration.
func (sys *BucketQuotaSys) Get(bucketName string) (*madmin.BucketQuota, error)
if globalIsGateway
objAPI := newObjectLayerFn()
if objAPI == nil
return nil, errServerNotInitialized
return &madmin.BucketQuota, nil
return globalBucketMetadataSys.GetQuotaConfig(bucketName)
// NewBucketQuotaSys returns initialized BucketQuotaSys
func NewBucketQuotaSys() *BucketQuotaSys
return &BucketQuotaSys
func (sys *BucketQuotaSys) check(ctx context.Context, bucket string, size int64) error
objAPI := newObjectLayerWithoutSafeModeFn()
if objAPI == nil
return errServerNotInitialized
q, err := sys.Get(bucket)
if err != nil
return nil
if q.Type == madmin.FIFOQuota
return nil
if q.Quota == 0
// No quota set return quickly.
return nil
sys.bucketStorageCache.Once.Do(func()
sys.bucketStorageCache.TTL = 10 * time.Second
sys.bucketStorageCache.Update = func() (interface, error)
return loadDataUsageFromBackend(ctx, objAPI)
)
v, err := sys.bucketStorageCache.Get()
if err != nil
return err
dui := v.(DataUsageInfo)
bui, ok := dui.BucketsUsage[bucket]
if !ok
// bucket not found, cannot enforce quota
// call will fail anyways later.
return nil
if (bui.Size + uint64(size)) > q.Quota
return BucketQuotaExceededBucket: bucket
return nil
3. 后台任务:周期删除object释放空间
const (
bgQuotaInterval = 1 * time.Hour
)
// initQuotaEnforcement starts the routine that deletes objects in bucket
// that exceeds the FIFO quota
func initQuotaEnforcement(ctx context.Context, objAPI ObjectLayer)
if env.Get(envDataUsageCrawlConf, config.EnableOn) == config.EnableOn
go startBucketQuotaEnforcement(ctx, objAPI)
func startBucketQuotaEnforcement(ctx context.Context, objAPI ObjectLayer)
for
select
case <-ctx.Done():
return
case <-time.NewTimer(bgQuotaInterval).C:
logger.LogIf(ctx, enforceFIFOQuota(ctx, objAPI))
// enforceFIFOQuota deletes objects in FIFO order until sufficient objects
// have been deleted so as to bring bucket usage within quota
func enforceFIFOQuota(ctx context.Context, objectAPI ObjectLayer) error
// Turn off quota enforcement if data usage info is unavailable.
if env.Get(envDataUsageCrawlConf, config.EnableOn) == config.EnableOff
return nil
buckets, err := objectAPI.ListBuckets(ctx)
if err != nil
return err
dataUsageInfo, err := loadDataUsageFromBackend(ctx, objectAPI)
if err != nil
return err
for _, binfo := range buckets
bucket := binfo.Name
bui, ok := dataUsageInfo.BucketsUsage[bucket]
if !ok
// bucket doesn't exist anymore, or we
// do not have any information to proceed.
continue
// Check if the current bucket has quota restrictions, if not skip it
cfg, err := globalBucketQuotaSys.Get(bucket)
if err != nil
continue
if cfg.Type != madmin.FIFOQuota
continue
var toFree uint64
if bui.Size > cfg.Quota && cfg.Quota > 0
toFree = bui.Size - cfg.Quota
if toFree == 0
continue
// Allocate new results channel to receive ObjectInfo.
objInfoCh := make(chan ObjectInfo)
// Walk through all objects
if err := objectAPI.Walk(ctx, bucket, "", objInfoCh); err != nil
return err
// reuse the fileScorer used by disk cache to score entries by
// ModTime to find the oldest objects in bucket to delete. In
// the context of bucket quota enforcement - number of hits are
// irrelevant.
scorer, err := newFileScorer(toFree, time.Now().Unix(), 1)
if err != nil
return err
rcfg, _ := globalBucketObjectLockSys.Get(bucket)
for obj := range objInfoCh
// skip objects currently under retention
if rcfg.LockEnabled && enforceRetentionForDeletion(ctx, obj)
continue
scorer.addFile(obj.Name, obj.ModTime, obj.Size, 1)
var objects []string
numKeys := len(scorer.fileNames())
for i, key := range scorer.fileNames()
objects = append(objects, key)
if len(objects) < maxDeleteList && (i < numKeys-1)
// skip deletion until maxObjectList or end of slice
continue
if len(objects) == 0
break
// Deletes a list of objects.
deleteErrs, err := objectAPI.DeleteObjects(ctx, bucket, objects)
if err != nil
logger.LogIf(ctx, err)
else
for i := range deleteErrs
if deleteErrs[i] != nil
logger.LogIf(ctx, deleteErrs[i])
continue
// Notify object deleted event.
sendEvent(eventArgs
EventName: event.ObjectRemovedDelete,
BucketName: bucket,
Object: ObjectInfo
Name: objects[i],
,
Host: "Internal: [FIFO-QUOTA-EXPIRY]",
)
objects = nil
return nil
以上是关于MinIO 源码分析的主要内容,如果未能解决你的问题,请参考以下文章
SpringBoot 搭建基于 minio 的高性能存储服务