Golang Kubenetes容器多集群平台开发实践
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Golang Kubenetes容器多集群平台开发实践相关的知识,希望对你有一定的参考价值。
前言
Go语言在基础服务开发领域优势
Go语言在高并发、通信交互复杂、重业务逻辑的分布式系统中非常适用,具有开发体验好、一定量级下服务稳定、性能满足需要等优势。随着kubernetes在企业中快速扩展,席卷云计算浪潮,而这些云原生技术都是用Go编写,Go语法简洁具有非常简单的并发编程支持。Go语言迅速成为云原生时代的编程语言,更成为kubernetes二次开发首选语言。
今天给大家分享一个Go语言开发运维平台的案例
LuBan运维平台是一个基于Go语言+Vue开发的Kubernetes多集群管理平台,可以兼容不同云厂商Kubernetes集群,同时集成CMDB资产管理、支持容器分批发布、发布暂停、JAVA应用诊断等功能。
项目地址:https://github.com/dnsjia/luban
技术框架
后端框架:Golang 1.16 + Gin前端框架:Vue3 + AntDesign 2.x数据库:mysql 5.7、Redis 5.0
资产管理
支持同步阿里云ecs到平台,代码实现思路比较简单,通过云平台API接口发起http请求调用查询资产数据
代码示例:
type CmdbInterface interface
GetHostMenu(pid, echo int) ([]*host.TreeList, error)
UpdateHostGroup(tgr *host.TreeGroupRequest) error
DeleteHostGroup(groupId int) (err error)
SShNodeTree(pid int, search string) ([]*host.SShNodeTreeList, error)
ListVirtualMachine(treeId string, page *models.PageResult, keyword string) (result models.PageResult, err error)
GetServerAssetsUsers(instanceId string, username interface) (data *[]host.SSHLoginUser, err error)
GetGlobalServerSSHUsers(page *models.PageResult) (result models.PageResult, err error)
DisableServerSSHUsers(sshUser *host.SystemSSHUser) (err error)
UpdateServerSSHUsers(sshData *host.SSHGlobalReq, hostUserId string) (err error)
CreateServerSSHUser(sshData *host.SSHGlobalReq) (err error)
GetServerAssets(page *models.PageResult) (result models.PageResult, err error)
DeleteServerSSHUsers(uid int) (err error)
DeleteHost(groupId int) (err error)
CollectionDeleteServer(s *host.ServerReq) (err error)
ConnectionSSH(instanceId string, credentialId string) (sshClient *ssh.Client, err error)
DeleteAssets(id uint) (err error)
GetVirtualByInstanceId(instanceId string) (h *host.VirtualMachine, err error)
GetByCredentialId(credentialId string) (h host.SSHGlobalConfig, err error)
GetVirtualByUUID(uuid string) (h *host.VirtualMachine, err error)
ListPlatform(page *models.PageResult) (result models.PageResult, err error)
CreateCloudAccount(platform *host.CloudPlatform) (err error)
GetCloudAccount(cloudType string) (platform *host.CloudPlatform, err error)
InsertSSHRecord(sshRecord *host.SSHRecord) error
...
Create([]*host.VirtualMachine) error // cmdb资产同步方法
Updates(uuid string, value map[string]interface) error
NewAsynqScheduler() (scheduler *asynq.Scheduler, crontab string)
NewAsynqServer() *asynq.Server
// 程序启动后通过调用tasks.TaskBeat启动定时任务,将通过配置文件crontab参数
// 定期同步资产到运维平台
// tasks.TaskBeta() 定时任务
// 配置文件如下# 每隔两小时同步一次
crontab:
aliyun: "* */2 * * *"
const (
SyncAliYunCloud = "cmdb:aliyun"
SyncTencentCloud = "cmdb:tencent"
TypeDeployDeployment = "deploy:deployment"
CheckDeployDeploymentStatus = "deploy:checkDeployment"
)
// NewAliCloudTask 同步阿里云资产同步任务
func NewAliCloudTask(conf *cmdb.CloudPlatform) *asynq.Task
payload, err := json.Marshal(conf)
if err != nil
log.Logger.Errorf("创建阿里云资产同步任务失败,err=%v", err)
return asynq.NewTask(SyncAliYunCloud, payload)
func HandleAliCloudTask(ctx context.Context, t *asynq.Task) error
var a cmdb.CloudPlatform
if err := json.Unmarshal(t.Payload(), &a); err != nil
return err
_, err := cloudvendor.GetVendorClient(&a)
if err != nil
log.Logger.Warnf("AccountVerify GetVendorClient failed,%v", err)
return err
cloudsync.SyncAliYunHost(&a)
log.Logger.Info("Aliyun Cloud assets are successfully synchronized...")
return nil
通过调用cloudsync.SyncAliYunHost(&account)方法传入阿里云Ak,创建aliClient同步资产
// SyncAliYunHost 同步阿里云主机
func SyncAliYunHost(task *cmdb.CloudPlatform)
defer func()
if err := recover(); err != nil
log.Logger.Error(fmt.Sprintf("sync panic err: %v", err))
()
// 获取cloud账户
conf := cmdb.CloudPlatform
Type: cmdb.AliYun,
AccessKey: task.AccessKey,
SecretKey: task.SecretKey,
aliClient, _ = cloudvendor.GetVendorClient(&conf)
// 获取所有可用区
regionSet, _ := aliClient.GetRegions()
for _, region := range regionSet
// 获取所有区域下的ecs主机
instancesInfo, err := aliClient.GetInstances(region.RegionId)
if err != nil
log.Logger.Error(fmt.Sprintf("同步资产发生错误, err: %v", err))
// 判断区域下是否有ecs
if len(instancesInfo) != 0
for _, i := range instancesInfo
// 根据主机实例id获取db中的主机信息,并获取有差异的主机
diffHosts, _ := getDiffHosts(&i)
if len(diffHosts) != 0
// 有差异的更新任务状态为同步中
//updateTaskState(diffHosts)
// 同步有差异的主机数据
syncDiffHosts(diffHosts)
return
同步逻辑
/Users/micheng/code/luban/pkg/cloud/cloudsync/hostsyncor.go
// addHost 添加云主机
func addHost(hostResource []*cmdb.VirtualMachine) error
if err := luban.CoreV1.Cmdb().Create(hostResource); err != nil
return err
return nil
// syncDiffHosts 更新变化的主机
func syncDiffHosts(diff map[string][]*cmdb.VirtualMachine)
for k, v := range diff
switch k
case "update":
for _, host := range v
// HostName、PublicAddr、PrivateAddr、VmExpiredTime、Status、Mem、CPU、BandWidth
if err := luban.CoreV1.Cmdb().Updates(host.UUID, map[string]interface
"hostname": &host.HostName,
"public_addr": &host.PublicAddr,
"private_addr": &host.PrivateAddr,
"vm_expired_time": &host.VmExpiredTime,
"status": &host.Status,
"mem": &host.Mem,
"cpu": &host.CPU,
"bandwidth": &host.BandWidth,
); err != nil
log.Logger.Error("更新主机资源失败", zap.Any("err", err.Error()))
case "add":
if err := addHost(v); err != nil
log.Logger.Error("同步新增主机失败", zap.Any("err", err.Error()))
资产管理支持为用户授权
WebSSH支持屏幕录像、文件管理
鲁班运维平台Web终端集成了文件管理, 使用户更加方便上传下载文件。
接入集群
获取集群凭证, 可在k8s控制节点执行 cat $HOME/.kube/config。将集群文件内容粘贴到集群管理内。
工作负载
计算Pod内多个容器代码逻辑
// GetPodListFromChannels returns a list of all Pods in the cluster
// reading required resource list once from the channels.
func GetPodListFromChannels(channels *k8scommon.ResourceChannels, dsQuery *dataselect.DataSelectQuery) (*PodList, error)
pods := <-channels.PodList.List
err := <-channels.PodList.Error
if err != nil
return nil, err
eventList := <-channels.EventList.List
err = <-channels.EventList.Error
if err != nil
return nil, err
podList := ToPodList(pods.Items, eventList.Items, dsQuery)
podList.Status = getStatus(pods, eventList.Items)
return &podList, nil
func ToPod(pod *coreV1.Pod, warnings []k8scommon.Event) *Pod
if pod == nil
return nil
var restarts = int32(0)
var containers []*ExtContainer
for _, container := range pod.Spec.Containers
bc := ExContainerStatus(pod.Status.ContainerStatuses, &container)
containers = append(containers, bc)
if bc.Restarts > restarts
restarts = bc.Restarts
var intContainers []*ExtContainer
cn := len(containers)
for _, container := range pod.Spec.InitContainers
bc := ExContainerStatus(pod.Status.InitContainerStatuses, &container)
intContainers = append(intContainers, bc)
if bc.Restarts > restarts
restarts = bc.Restarts
cn += len(intContainers)
podDetail := Pod
ObjectMeta: k8s.NewObjectMeta(pod.ObjectMeta),
TypeMeta: k8s.NewTypeMeta(k8s.ResourceKindPod),
Warnings: warnings,
Status: getPodStatus(*pod),
InitContainers: intContainers,
RestartCount: getRestartCount(*pod),
NodeName: pod.Spec.NodeName,
ContainerImages: k8scommon.GetContainerImages(&pod.Spec),
PodIP: pod.Status.PodIP,
ContainerNum: cn,
Containers: containers,
return &podDetail
容器组
支持动态打印、日志下载
- 容器终端连接支持屏幕录像、操作审计。
- 容器终端连接文件管理支持上传、下载
容器监控
容器监控依赖于Prometheus,需要在导入集群时填写prometheus地址即可完成监控展示
应用发布
应用详情支持展示应用CPU、内存使用率。同时支持配置弹性伸缩策略
HPA弹性伸缩
为应用配置弹性伸缩(HPA),同时支持两种指标cpu、memory
应用发布支持分批发布,基于Openkruise实现
在发布的过程中支持随时暂停发布, 这里使用的是原生操作。控制器还是会做 replicas
数量管理
发布暂停代码逻辑如下
// 根据环境id, 获取kubeConfig
cs, err := luban.CoreV1.App().GetClusterConfig(context.TODO(), pause.EnvId, pause.Namespace)
if err != nil
response.FailWithMessage(response.AppEnvIdNotFound, response.AppEnvIdNotFoundMsg, c)
return
...
// 调用kubernetes api
client, err := Init.GetK8sClient(cs.KubeConfig)
if err != nil
response.FailWithMessage(http.StatusInternalServerError, response.NotOkMsg, c)
return
dClient := client.AppsV1().Deployments(pause.Namespace)
// RetryOnConflict 对资源进行更新 防止其他代码同时对资源进行不相关更新引起的冲突
// RetryOnConflict 将等待一段时间,如退避,然后重试。出现非“冲突”错误,或者重试次数过多并放弃,RetryOnConflict 将向调用者返回错误。
retryErr := retry.RetryOnConflict(retry.DefaultRetry, func() error
result, getErr := dClient.Get(context.TODO(), pause.AppName, metav1.GetOptions)
if getErr != nil
log.Logger.Error(fmt.Sprintf("获取deployment 发生错误, err=%s", getErr))
return getErr
if pause.IsPause
result.Spec.Paused = true
msg = "发布已暂停"
else
result.Spec.Paused = false
msg = "已恢复发布"
_, updateErr := dClient.Update(context.TODO(), result, metav1.UpdateOptions)
return updateErr
)
....
应用诊断
基于阿里arthas对JAVA应用进行性能诊断,快速定位Java线上问题、可以在不重启JVM进程的情况下,查看程序的运行情况。线程耗时分析、方法执行分析和性能分析。
对需要诊断的应用Pod Annotation增加diagnosis: true
字段
面对线上服务器cpu使用率一直处于100% ,cpu使用率居高不下,某些线程一直占用着cpu资源,那又如何查看占用cpu较高的线程?按照传统方法我们需通过top命令定位到cpu占用率较高的线程之后,使用jstack pid命令查看当前java进程的堆栈状态。隔段时间再执行一次stack命令获取thread dump,通过thread dump分析线程状态
在dump中,线程一般存在如下几种状态:
1、RUNNABLE,线程处于执行中
2、BLOCKED,线程被阻塞
3、WAITING,线程正在等待
针对以上问题,可以使用应用诊断快速定位线程是否存在死锁,线程CPU使用率过高问题。很大程度上节约了线上问题排查故障时间
源码查看
通过jad反编译class文件
线程堆栈
JVM信息
项目地址:https://github.com/dnsjia/luban
文档地址:https://docs.dnsjia.com/application/diagnosis/
公众号: 自记小屋
另开设了容器开发实战培训班,想学习提升技能的同学欢迎入群交流学习。
以上是关于Golang Kubenetes容器多集群平台开发实践的主要内容,如果未能解决你的问题,请参考以下文章
360开源Wayne:企业级可视化多集群Kubernetes一站式管理平台