一个Pod调度失败后重新触发调度的所有情况分析(调度器可以在大作业运行时让小作业获得计算资源)燃爆了

随心笔谈12个月前发布 admin
88 0



目录正文flushUnschedulablePodsLeftoverflushBackoffQCompleted新加入节点节点更新已经存在的 Pod 发生变化集群内有Pod删除

在 k8s 中一个Pod由于某些原因调度失败后,会被放入调度失败队列,这个队列里面的Pod后面都怎么样了呢?

他们怎么样才能重新获取到”重新做人的机会“呢?这篇文章,我们从源码的角度来看看来龙去脉

在 k8s 中会起两个协程,定期把 backoffQ 和 unscheduledQ 里面的 Pod拿到activeQ里面去

func (p *PriorityQueue) Run() {
go wait.Until(p.flushBackoffQCompleted, 1.0*time.Second, p.stop)
go wait.Until(p.flushUnschedulablePodsLeftover, 30*time.Second, p.stop)
}
func (p *PriorityQueue) flushUnschedulablePodsLeftover() {
p.lock.Lock()
defer p.lock.Unlock()
var podsToMove []*framework.QueuedPodInfo
currentTime :=p.clock.Now()
for _, pInfo :=range p.unschedulablePods.podInfoMap {
lastScheduleTime :=pInfo.Timestamp
if currentTime.Sub(lastScheduleTime) > p.podMaxInUnschedulablePodsDuration {
podsToMove=append(podsToMove, pInfo)
}
}
if len(podsToMove) > 0 {
p.movePodsToActiveOrBackoffQueue(podsToMove, UnschedulableTimeout)
}
}
func (p *PriorityQueue) movePodsToActiveOrBackoffQueue(podInfoList []*framework.QueuedPodInfo, event framework.ClusterEvent) {
activated :=false
for _, pInfo :=range podInfoList {
// If the event doesn’t help making the Pod schedulable, continue.
// Note: we don’t run the check if pInfo.UnschedulablePlugins is nil, which denotes
// either there is some abnormal error, or scheduling the pod failed by plugins other than PreFilter, Filter and Permit.
// In that case, it’s desired to move it anyways.
if len(pInfo.UnschedulablePlugins) !=0 && !p.podMatchesEvent(pInfo, event) {
continue
}
pod :=pInfo.Pod
if p.isPodBackingoff(pInfo) {
if err :=p.podBackoffQ.Add(pInfo); err !=nil {
klog.ErrorS(err, “Error adding pod to the backoff queue”, “pod”, klog.KObj(pod))
} else {
metrics.SchedulerQueueIncomingPods.WithLabelValues(“backoff”, event.Label).Inc()
p.unschedulablePods.delete(pod)
}
} else {
if err :=p.activeQ.Add(pInfo); err !=nil {
klog.ErrorS(err, “Error adding pod to the scheduling queue”, “pod”, klog.KObj(pod))
} else {
metrics.SchedulerQueueIncomingPods.WithLabelValues(“active”, event.Label).Inc()
p.unschedulablePods.delete(pod)
}
}
}
p.moveRequestCycle=p.schedulingCycle
if activated {
p.cond.Broadcast()
}
}

将在unscheduledQ里面停留时长超过podMaxInUnschedulablePodsDuration(默认是5min)的pod放入到 ActiveQ 或 BackoffQueue,具体是放到哪个队列里面,根据下面规则判断:

根据这个Pod尝试被调度的次数,计算这个Pod应该等待下一次调度的时间,计算规则为指数级增长,即按照1s,2s,4s,8s这样的时间进行等待,但是这个等待时间也不会无限增加,会受到 podMaxBackoffDuration(默认10s) 的限制,这个参数的意思是一个 Pod处于Backoff的最大时间,如果等待的时间如果超过了 podMaxBackoffDuration,那么就只等待 podMaxBackoffDuration 就会再次被调度;当前时间 – 上次调度的时间 > 根据1获取到的应该等待的时间,那么就把Pod放到activeQ里面,将会被调度,否则Pod被放入 backoff 队列里继续等待,如果是在backoff 队列等待的话,后面就会被flushBackoffQCompleted取出

所以这里 Pod 如果满足条件的话 就一定会从unscheduleQ里面移到 backooff里面或者activeQ里面

去取 backoff 队列(优先队列)里面取等待时间结束的 Pod,放入 activeQ

func (p *PriorityQueue) flushBackoffQCompleted() {
p.lock.Lock()
defer p.lock.Unlock()
activated :=false
for {
rawPodInfo :=p.podBackoffQ.Peek()
if rawPodInfo==nil {
break
}
pod :=rawPodInfo.(*framework.QueuedPodInfo).Pod
boTime :=p.getBackoffTime(rawPodInfo.(*framework.QueuedPodInfo))
if boTime.After(p.clock.Now()) {
break
}
_, err :=p.podBackoffQ.Pop()
if err !=nil {
klog.ErrorS(err, “Unable to pop pod from backoff queue despite backoff completion”, “pod”, klog.KObj(pod))
break
}
p.activeQ.Add(rawPodInfo)
metrics.SchedulerQueueIncomingPods.WithLabelValues(“active”, BackoffComplete).Inc()
activated=true
}
if activated {
p.cond.Broadcast()
}
}

那么除了上述定期主动去判断一个 UnscheduledQ 或 backoffQ 里面的Pod是不是可以再次被调度,那么还有没有其他情况呢?

答案是有的。

还有四种情况会重新判断这两个队列里的 Pod 是不是要重新调度

有新节点加入集群节点配置或状态发生变化已经存在的 Pod 发生变化集群内有Pod被删除

informerFactory.Core().V1().Nodes().Informer().AddEventHandler(
cache.ResourceEventHandlerFuncs{
AddFunc: sched.addNodeToCache,
UpdateFunc: sched.updateNodeInCache,
DeleteFunc: sched.deleteNodeFromCache,
},
)
func (sched *Scheduler) addNodeToCache(obj interface{}) {
node, ok :=obj.(*v1.Node)
if !ok {
klog.ErrorS(nil, “Cannot convert to *v1.Node”, “obj”, obj)
return
}
nodeInfo :=sched.Cache.AddNode(node)
klog.V(3).InfoS(“Add event for node”, “node”, klog.KObj(node))
sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(queue.NodeAdd, preCheckForNode(nodeInfo))
}
func preCheckForNode(nodeInfo *framework.NodeInfo) queue.PreEnqueueCheck {
// Note: the following checks doesn’t take preemption into considerations, in very rare
// cases (e.g., node resizing), “pod” may still fail a check but preemption helps. We deliberately
// chose to ignore those cases as unschedulable pods will be re-queued eventually.
return func(pod *v1.Pod) bool {
admissionResults :=AdmissionCheck(pod, nodeInfo, false)
if len(admissionResults) !=0 {
return false
}
_, isUntolerated :=corev1helpers.FindMatchingUntoleratedTaint(nodeInfo.Node().Spec.Taints, pod.Spec.Tolerations, func(t *v1.Taint) bool {
return t.Effect==v1.TaintEffectNoSchedule
})
return !isUntolerated
}
}

可以看到,当有节点加入集群的时候,会把unscheduledQ 里面的Pod 依次拿出来做下面的判断:

Pod 对 节点的亲和性Pod 中 Nodename不为空 那么判断新加入节点的Name判断pod Nodename是否相等判断 Pod 中容器对端口的要求是否和新加入节点已经被使用的端口冲突Pod 是否容忍了Node的Pod

只有上述4个条件都满足,那么新加入节点这个事件才会触发这个未被调度的Pod加入到 backoffQ 或者 activeQ,至于是加入哪个queue,上面已经分析过了

func (sched *Scheduler) updateNodeInCache(oldObj, newObj interface{}) {
oldNode, ok :=oldObj.(*v1.Node)
if !ok {
klog.ErrorS(nil, “Cannot convert oldObj to *v1.Node”, “oldObj”, oldObj)
return
}
newNode, ok :=newObj.(*v1.Node)
if !ok {
klog.ErrorS(nil, “Cannot convert newObj to *v1.Node”, “newObj”, newObj)
return
}
nodeInfo :=sched.Cache.UpdateNode(oldNode, newNode)
// Only requeue unschedulable pods if the node became more schedulable.
if event :=nodeSchedulingPropertiesChange(newNode, oldNode); event !=nil {
sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(*event, preCheckForNode(nodeInfo))
}
}
func nodeSchedulingPropertiesChange(newNode *v1.Node, oldNode *v1.Node) *framework.ClusterEvent {
if nodeSpecUnschedulableChanged(newNode, oldNode) {
return &queue.NodeSpecUnschedulableChange
}
if nodeAllocatableChanged(newNode, oldNode) {
return &queue.NodeAllocatableChange
}
if nodeLabelsChanged(newNode, oldNode) {
return &queue.NodeLabelChange
}
if nodeTaintsChanged(newNode, oldNode) {
return &queue.NodeTaintChange
}
if nodeConditionsChanged(newNode, oldNode) {
return &queue.NodeConditionChange
}
return nil
}

首先是判断节点是何种配置发生了变化,有如下情况

节点可调度情况发生变化节点可分配资源发生变化节点标签发生变化节点污点发生变化节点状态发生变化

如果某个 Pod 调度失败的原因可以匹配到上面其中一个原因,那么节点更新这个事件才会触发这个未被调度的Pod加入到 backoffQ 或者 activeQ

informerFactory.Core().V1().Pods().Informer().AddEventHandler(
cache.FilteringResourceEventHandler{
FilterFunc: func(obj interface{}) bool {
switch t :=obj.(type) {
case *v1.Pod:
return assignedPod(t)
case cache.DeletedFinalStateUnknown:
if _, ok :=t.Obj.(*v1.Pod); ok {
// The carried object may be stale, so we don’t use it to check if
// it’s assigned or not. Attempting to cleanup anyways.
return true
}
utilruntime.HandleError(fmt.Errorf(“unable to convert object %T to *v1.Pod in %T”, obj, sched))
return false
default:
utilruntime.HandleError(fmt.Errorf(“unable to handle object in %T: %T”, sched, obj))
return false
}
},
Handler: cache.ResourceEventHandlerFuncs{
AddFunc: sched.addPodToCache,
UpdateFunc: sched.updatePodInCache,
DeleteFunc: sched.deletePodFromCache,
},
},
)
func (sched *Scheduler) addPodToCache(obj interface{}) {
pod, ok :=obj.(*v1.Pod)
if !ok {
klog.ErrorS(nil, “Cannot convert to *v1.Pod”, “obj”, obj)
return
}
klog.V(3).InfoS(“Add event for scheduled pod”, “pod”, klog.KObj(pod))
if err :=sched.Cache.AddPod(pod); err !=nil {
klog.ErrorS(err, “Scheduler cache AddPod failed”, “pod”, klog.KObj(pod))
}
sched.SchedulingQueue.AssignedPodAdded(pod)
}
func (p *PriorityQueue) AssignedPodAdded(pod *v1.Pod) {
p.lock.Lock()
p.movePodsToActiveOrBackoffQueue(p.getUnschedulablePodsWithMatchingAffinityTerm(pod), AssignedPodAdd)
p.lock.Unlock()
}
func (p *PriorityQueue) getUnschedulablePodsWithMatchingAffinityTerm(pod *v1.Pod) []*framework.QueuedPodInfo {
var nsLabels labels.Set
nsLabels=interpodaffinity.GetNamespaceLabelsSnapshot(pod.Namespace, p.nsLister)
var podsToMove []*framework.QueuedPodInfo
for _, pInfo :=range p.unschedulablePods.podInfoMap {
for _, term :=range pInfo.RequiredAffinityTerms {
if term.Matches(pod, nsLabels) {
podsToMove=append(podsToMove, pInfo)
break
}
}
}
return podsToMove
}

可以看到,已经存在的Pod发生变化后,会把这个Pod亲和性配置依次和unscheduledQ里面的Pod匹配,如果能够匹配上,那么节点更新这个事件才会触发这个未被调度的Pod加入到 backoffQ 或者 activeQ。

func (sched *Scheduler) deletePodFromCache(obj interface{}) {
var pod *v1.Pod
switch t :=obj.(type) {
case *v1.Pod:
pod=t
case cache.DeletedFinalStateUnknown:
var ok bool
pod, ok=t.Obj.(*v1.Pod)
if !ok {
klog.ErrorS(nil, “Cannot convert to *v1.Pod”, “obj”, t.Obj)
return
}
default:
klog.ErrorS(nil, “Cannot convert to *v1.Pod”, “obj”, t)
return
}
klog.V(3).InfoS(“Delete event for scheduled pod”, “pod”, klog.KObj(pod))
if err :=sched.Cache.RemovePod(pod); err !=nil {
klog.ErrorS(err, “Scheduler cache RemovePod failed”, “pod”, klog.KObj(pod))
}
sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(queue.AssignedPodDelete, nil)
}

可以看到,Pod删除时间不像其他时间需要做额外的判断,这个preCheck函数是空的,所以所有 unscheduledQ 里面的Pod都会被放到 activeQ或者backoffQ里面。

从上面的情况,我们可以看到,集群内有事件发生变化,是可以加速调度失败的Pod被重新调度的进程的。常规的是,调度失败的 Pod 需要等5min 然后才会被重新加入 backoff 或 activeQ。backoffQ里面的Pod也需要等一段时间才会重新调度。这也就是为什么,当你修改节点配置的时候,能看到Pod马上重新被调度的原因

上面就是一个Pod调度失败后,重新触发调度的所有情况了。

更多关于Pod调度失败重新触发的资料请关注脚本之家其它相关文章!

您可能感兴趣的文章:Go语言kube-scheduler深度剖析与开发之pod调度pod调度将 Pod 指派给节点云原生技术kubernetes调度单位pod的使用详解Kubernetes Informer数据存储Index与Pod分配流程解析Kubernetes Visitor设计模式及发送pod创建请求解析Kubernetes kubectl中Pod创建流程源码解析静态pod 创建使用示例详解使用k8tz解决pod内的时区问题(坑的解决)

© 版权声明

相关文章