Kubernetes18--kube-scheduler 소스 3--scheduler 실행 과정
23406 단어 클라우드 컴퓨팅
Scheduler의 시작 함수
func (sched *Scheduler) Run() {
if !sched.config.WaitForCacheSync() {
go wait.Until(sched.scheduleOne, 0, sched.config.StopEverything)
우선 캐시 동기화를 기다리는 WaitForCacheSync waits for scheduler cache to populate WaitForCacheSync 방법은 Config 안의 인터페이스 방법입니다.CreateFromKeys에서 기본적인 구현을 제공합니다
WaitForCacheSync: func() bool {
return cache.WaitForCacheSync(c.StopEverything, c.scheduledPodsHasSynced)
func WaitForCacheSync(stopCh
스케줄링 프로세스의 핵심 scheduleOne 방법
첫 번째 단계는 예약된 Pod 객체를 가져옵니다. 예약된 Pod가 비어 있거나 예약된 Pod가 삭제된 경우 바로 반환합니다.
pod := sched.config.NextPod()
// pod could be nil when schedulerQueue is closed
if pod == nil {
if pod.DeletionTimestamp != nil {
sched.config.Recorder.Eventf(pod, v1.EventTypeWarning, "FailedScheduling", "skip schedule deleting pod: %v/%v", pod.Namespace, pod.Name)
klog.V(3).Infof("Skip schedule deleting pod: %v/%v", pod.Namespace, pod.Name)
NextPod 구현
NextPod: func() *v1.Pod {
return c.getNextPod()
func (c *configFactory) getNextPod() *v1.Pod {
pod, err := c.podQueue.Pop()
if err == nil {
klog.V(4).Infof("About to try and schedule pod %v/%v", pod.Namespace, pod.Name)
return pod
klog.Errorf("Error while retrieving next pod from scheduling queue: %v", err)
return nil
configFactory에서podQueue 대기열
// queue for pods that need scheduling
podQueue internalqueue.SchedulingQueue
SchedulingQueue 분석
SchedulingQueue는 FIFO나 Heap과 같은 스케줄링 대상의 대기열을 저장합니다.SchedulingQueue is an interface for a queue to store pods waiting to be scheduled.The interface follows a pattern similar to cache.FIFO and cache.Heap and makes it easy to use those data structures as a SchedulingQueue.
type SchedulingQueue interface {
Add(pod *v1.Pod) error
AddIfNotPresent(pod *v1.Pod) error
AddUnschedulableIfNotPresent(pod *v1.Pod) error
// Pop removes the head of the queue and returns it. It blocks if the
// queue is empty and waits until a new item is added to the queue.
Pop() (*v1.Pod, error)
Update(oldPod, newPod *v1.Pod) error
Delete(pod *v1.Pod) error
AssignedPodAdded(pod *v1.Pod)
AssignedPodUpdated(pod *v1.Pod)
WaitingPodsForNode(nodeName string) []*v1.Pod
WaitingPods() []*v1.Pod
// Close closes the SchedulingQueue so that the goroutine which is
// waiting to pop items can exit gracefully.
// DeleteNominatedPodIfExists deletes nominatedPod from internal cache
DeleteNominatedPodIfExists(pod *v1.Pod)
SchedulingQueue 구성 방법은Pod가 시작되면 우선 순위 대기열을 사용하고, 우선 순위가 없으면 FIFO를 사용합니다
// NewSchedulingQueue initializes a new scheduling queue. If pod priority is
// enabled a priority queue is returned. If it is disabled, a FIFO is returned.
func NewSchedulingQueue(stop
우선 순위 대기열 생성:
// NewPriorityQueue creates a PriorityQueue object.
func NewPriorityQueue(stop
FIFO 대기열 생성:
// NewFIFO creates a FIFO object.
func NewFIFO() *FIFO {
return &FIFO{FIFO: cache.NewFIFO(cache.MetaNamespaceKeyFunc)}
Pop 접근 방식 구현
func (f *FIFO) Pop() (*v1.Pod, error) {
result, err := f.FIFO.Pop(func(obj interface{}) error { return nil })
if err == cache.FIFOClosedError {
return nil, fmt.Errorf(queueClosed)
return result.(*v1.Pod), err
Pod용 Node 호스트 선택
// Synchronously attempt to find a fit for the pod.
start := time.Now()
suggestedHost, err := sched.schedule(pod)
if err != nil {
// schedule() may have failed because the pod would not fit on any host, so we try to
// preempt, with the expectation that the next time the pod is tried for scheduling it
// will fit due to the preemption. It is also possible that a different pod will schedule
// into the resources that were preempted, but this is harmless.
if fitError, ok := err.(*core.FitError); ok {
preemptionStartTime := time.Now()
sched.preempt(pod, fitError)
// Pod did not fit anywhere, so it is counted as a failure. If preemption
// succeeds, the pod should get counted as a success the next time we try to
// schedule it. (hopefully)
} else {
klog.Errorf("error selecting node for pod: %v", err)
schedule 방법
// schedule implements the scheduling algorithm and returns the suggested host.
func (sched *Scheduler) schedule(pod *v1.Pod) (string, error) {
host, err := sched.config.Algorithm.Schedule(pod, sched.config.NodeLister)
if err != nil {
pod = pod.DeepCopy()
sched.recordSchedulingFailure(pod, err, v1.PodReasonUnschedulable, err.Error())
return "", err
return host, err
Schedule Algorithm은 Pod에서 node로 스케줄링하는 방법을 정의하고 Schedule 방법은Pod에서 Node를 선택합니다
// ScheduleAlgorithm is an interface implemented by things that know how to schedule pods
// onto machines.
type ScheduleAlgorithm interface {
Schedule(*v1.Pod, NodeLister) (selectedMachine string, err error)
// Preempt receives scheduling errors for a pod and tries to create room for
// the pod by preempting lower priority pods if possible.
// It returns the node where preemption happened, a list of preempted pods, a
// list of pods whose nominated node name should be removed, and error if any.
Preempt(*v1.Pod, NodeLister, error) (selectedNode *v1.Node, preemptedPods []*v1.Pod, cleanupNominatedPods []*v1.Pod, err error)
// Predicates() returns a pointer to a map of predicate functions. This is
// exposed for testing.
Predicates() map[string]FitPredicate
// Prioritizers returns a slice of priority config. This is exposed for
// testing.
Prioritizers() []PriorityConfig
선택한 항목이 실패하면 로깅Pod 스케줄링이 실패합니다.
// recordFailedSchedulingEvent records an event for the pod that indicates the
// pod has failed to schedule.
// NOTE: This function modifies "pod". "pod" should be copied before being passed.
func (sched *Scheduler) recordSchedulingFailure(pod *v1.Pod, err error, reason string, message string) {
sched.config.Error(pod, err)
sched.config.Recorder.Event(pod, v1.EventTypeWarning, "FailedScheduling", message)
sched.config.PodConditionUpdater.Update(pod, &v1.PodCondition{
Type: v1.PodScheduled,
Status: v1.ConditionFalse,
Reason: reason,
Message: err.Error(),
suggestedHost, err := sched.schedule(pod) 실행이 실패할 경우 현재 스케줄링할 Pod 객체 대체
if err != nil {
// schedule() may have failed because the pod would not fit on any host, so we try to
// preempt, with the expectation that the next time the pod is tried for scheduling it
// will fit due to the preemption. It is also possible that a different pod will schedule
// into the resources that were preempted, but this is harmless.
if fitError, ok := err.(*core.FitError); ok {
preemptionStartTime := time.Now()
sched.preempt(pod, fitError)
// Pod did not fit anywhere, so it is counted as a failure. If preemption
// succeeds, the pod should get counted as a success the next time we try to
// schedule it. (hopefully)
} else {
klog.Errorf("error selecting node for pod: %v", err)
preempt 메서드는 스케줄링에 실패한 Pod를 저장하고 다음 우선 순위가 낮은 Pod를 선택합니다.
// preempt tries to create room for a pod that has failed to schedule, by preempting lower priority pods if possible.
// If it succeeds, it adds the name of the node where preemption has happened to the pod annotations.
// It returns the node name and an error if any.
func (sched *Scheduler) preempt(preemptor *v1.Pod, scheduleErr error) (string, error) {
if !util.PodPriorityEnabled() || sched.config.DisablePreemption {
klog.V(3).Infof("Pod priority feature is not enabled or preemption is disabled by scheduler configuration." +
" No preemption is performed.")
return "", nil
preemptor, err := sched.config.PodPreemptor.GetUpdatedPod(preemptor)
if err != nil {
klog.Errorf("Error getting the updated preemptor pod object: %v", err)
return "", err
node, victims, nominatedPodsToClear, err := sched.config.Algorithm.Preempt(preemptor, sched.config.NodeLister, scheduleErr)
if err != nil {
klog.Errorf("Error preempting victims to make room for %v/%v.", preemptor.Namespace, preemptor.Name)
return "", err
var nodeName = ""
if node != nil {
nodeName = node.Name
err = sched.config.PodPreemptor.SetNominatedNodeName(preemptor, nodeName)
if err != nil {
klog.Errorf("Error in preemption process. Cannot update pod %v/%v annotations: %v", preemptor.Namespace, preemptor.Name, err)
return "", err
for _, victim := range victims {
if err := sched.config.PodPreemptor.DeletePod(victim); err != nil {
klog.Errorf("Error preempting pod %v/%v: %v", victim.Namespace, victim.Name, err)
return "", err
sched.config.Recorder.Eventf(victim, v1.EventTypeNormal, "Preempted", "by %v/%v on node %v", preemptor.Namespace, preemptor.Name, nodeName)
// Clearing nominated pods should happen outside of "if node != nil". Node could
// be nil when a pod with nominated node name is eligible to preempt again,
// but preemption logic does not find any node for it. In that case Preempt()
// function of generic_scheduler.go returns the pod itself for removal of the annotation.
for _, p := range nominatedPodsToClear {
rErr := sched.config.PodPreemptor.RemoveNominatedNodeName(p)
if rErr != nil {
klog.Errorf("Cannot remove nominated node annotation of pod: %v", rErr)
// We do not return as this error is not critical.
return nodeName, err
PodPreemptor 해결은 Pod 수정 및 Node 태그 수정 방법을 정의합니다.
// PodPreemptor has methods needed to delete a pod and to update
// annotations of the preemptor pod.
type PodPreemptor interface {
GetUpdatedPod(pod *v1.Pod) (*v1.Pod, error)
DeletePod(pod *v1.Pod) error
SetNominatedNodeName(pod *v1.Pod, nominatedNode string) error
RemoveNominatedNodeName(pod *v1.Pod) error
구체적 실현
type podPreemptor struct {
Client clientset.Interface
func (p *podPreemptor) GetUpdatedPod(pod *v1.Pod) (*v1.Pod, error) {
return p.Client.CoreV1().Pods(pod.Namespace).Get(pod.Name, metav1.GetOptions{})
func (p *podPreemptor) DeletePod(pod *v1.Pod) error {
return p.Client.CoreV1().Pods(pod.Namespace).Delete(pod.Name, &metav1.DeleteOptions{})
func (p *podPreemptor) SetNominatedNodeName(pod *v1.Pod, nominatedNodeName string) error {
podCopy := pod.DeepCopy()
podCopy.Status.NominatedNodeName = nominatedNodeName
_, err := p.Client.CoreV1().Pods(pod.Namespace).UpdateStatus(podCopy)
return err
func (p *podPreemptor) RemoveNominatedNodeName(pod *v1.Pod) error {
if len(pod.Status.NominatedNodeName) == 0 {
return nil
return p.SetNominatedNodeName(pod, "")
schedule 방법이 성공적으로 실행되면 현재 Pod에 적합한 Node를 되돌려줍니다. 다음 단계는 assumeVolumes 방법을 실행하고 Pod를 분배하기 전에 디렉터리 볼륨을 먼저 분배합니다
// Tell the cache to assume that a pod now is running on a given node, even though it hasn't been bound yet.
// This allows us to keep scheduling without waiting on binding to occur.
assumedPod := pod.DeepCopy()
// Assume volumes first before assuming the pod.
// If all volumes are completely bound, then allBound is true and binding will be skipped.
// Otherwise, binding of volumes is started after the pod is assumed, but before pod binding.
// This function modifies 'assumedPod' if volume binding is required.
allBound, err := sched.assumeVolumes(assumedPod, suggestedHost)
if err != nil {
klog.Errorf("error assuming volumes: %v", err)
디렉토리 볼륨 할당 방법
// assumeVolumes will update the volume cache with the chosen bindings
// This function modifies assumed if volume binding is required.
func (sched *Scheduler) assumeVolumes(assumed *v1.Pod, host string) (allBound bool, err error) {
if utilfeature.DefaultFeatureGate.Enabled(features.VolumeScheduling) {
allBound, err = sched.config.VolumeBinder.Binder.AssumePodVolumes(assumed, host)
if err != nil {
sched.recordSchedulingFailure(assumed, err, SchedulerError,
fmt.Sprintf("AssumePodVolumes failed: %v", err))
// Invalidate ecache because assumed volumes could have affected the cached
// pvs for other pods
if sched.config.Ecache != nil {
invalidPredicates := sets.NewString(predicates.CheckVolumeBindingPred)
// Run "reserve" plugins.
for _, pl := range plugins.ReservePlugins() {
if err := pl.Reserve(plugins, assumedPod, suggestedHost); err != nil {
klog.Errorf("error while running %v reserve plugin for pod %v: %v", pl.Name(), assumedPod.Name, err)
sched.recordSchedulingFailure(assumedPod, err, SchedulerError,
fmt.Sprintf("reserve plugin %v failed", pl.Name()))
// assume modifies `assumedPod` by setting NodeName=suggestedHost
err = sched.assume(assumedPod, suggestedHost)
if err != nil {
klog.Errorf("error assuming pod: %v", err)
// assume signals to the cache that a pod is already in the cache, so that binding can be asynchronous.
// assume modifies `assumed`.
func (sched *Scheduler) assume(assumed *v1.Pod, host string) error {
// Optimistically assume that the binding will succeed and send it to apiserver
// in the background.
// If the binding fails, scheduler will release resources allocated to assumed pod
// immediately.
assumed.Spec.NodeName = host
// NOTE: Updates must be written to scheduler cache before invalidating
// equivalence cache, because we could snapshot equivalence cache after the
// invalidation and then snapshot the cache itself. If the cache is
// snapshotted before updates are written, we would update equivalence
// cache with stale information which is based on snapshot of old cache.
if err := sched.config.SchedulerCache.AssumePod(assumed); err != nil {
klog.Errorf("scheduler cache AssumePod failed: %v", err)
// This is most probably result of a BUG in retrying logic.
// We report an error here so that pod scheduling can be retried.
// This relies on the fact that Error will check if the pod has been bound
// to a node and if so will not add it back to the unscheduled pods queue
// (otherwise this would cause an infinite loop).
sched.recordSchedulingFailure(assumed, err, SchedulerError,
fmt.Sprintf("AssumePod failed: %v", err))
return err
// if "assumed" is a nominated pod, we should remove it from internal cache
if sched.config.SchedulingQueue != nil {
// Optimistically assume that the binding will succeed, so we need to invalidate affected
// predicates in equivalence cache.
// If the binding fails, these invalidated item will not break anything.
if sched.config.Ecache != nil {
sched.config.Ecache.InvalidateCachedPredicateItemForPodAdd(assumed, host)
return nil
비동기적으로 바인딩 프로세스 실행
// bind the pod to its host asynchronously (we can do this b/c of the assumption step above).
go func() {
// Bind volumes first before Pod
if !allBound {
err := sched.bindVolumes(assumedPod)
if err != nil {
klog.Errorf("error binding volumes: %v", err)
// Run "prebind" plugins.
for _, pl := range plugins.PrebindPlugins() {
approved, err := pl.Prebind(plugins, assumedPod, suggestedHost)
if err != nil {
approved = false
klog.Errorf("error while running %v prebind plugin for pod %v: %v", pl.Name(), assumedPod.Name, err)
if !approved {
var reason string
if err == nil {
msg := fmt.Sprintf("prebind plugin %v rejected pod %v.", pl.Name(), assumedPod.Name)
err = errors.New(msg)
reason = v1.PodReasonUnschedulable
} else {
reason = SchedulerError
sched.recordSchedulingFailure(assumedPod, err, reason, err.Error())
err := sched.bind(assumedPod, &v1.Binding{
ObjectMeta: metav1.ObjectMeta{Namespace: assumedPod.Namespace, Name: assumedPod.Name, UID: assumedPod.UID},
Target: v1.ObjectReference{
Kind: "Node",
Name: suggestedHost,
if err != nil {
klog.Errorf("error binding pod: %v", err)
} else {
bindVolumes 메서드가 디렉토리 바인딩을 수행합니다.
// bindVolumes will make the API update with the assumed bindings and wait until
// the PV controller has completely finished the binding operation.
// If binding errors, times out or gets undone, then an error will be returned to
// retry scheduling.
func (sched *Scheduler) bindVolumes(assumed *v1.Pod) error {
klog.V(5).Infof("Trying to bind volumes for pod \"%v/%v\"", assumed.Namespace, assumed.Name)
err := sched.config.VolumeBinder.Binder.BindPodVolumes(assumed)
if err != nil {
klog.V(1).Infof("Failed to bind volumes for pod \"%v/%v\": %v", assumed.Namespace, assumed.Name, err)
// Unassume the Pod and retry scheduling
if forgetErr := sched.config.SchedulerCache.ForgetPod(assumed); forgetErr != nil {
klog.Errorf("scheduler cache ForgetPod failed: %v", forgetErr)
// Volumes may be bound by PV controller asynchronously, we must clear
// stale pod binding cache.
sched.recordSchedulingFailure(assumed, err, "VolumeBindingFailed", err.Error())
return err
klog.V(5).Infof("Success binding volumes for pod \"%v/%v\"", assumed.Namespace, assumed.Name)
return nil
prebind 플러그인 실행 방법
// Run "prebind" plugins.
for _, pl := range plugins.PrebindPlugins() {
approved, err := pl.Prebind(plugins, assumedPod, suggestedHost)
if err != nil {
approved = false
klog.Errorf("error while running %v prebind plugin for pod %v: %v", pl.Name(), assumedPod.Name, err)
if !approved {
var reason string
if err == nil {
msg := fmt.Sprintf("prebind plugin %v rejected pod %v.", pl.Name(), assumedPod.Name)
err = errors.New(msg)
reason = v1.PodReasonUnschedulable
} else {
reason = SchedulerError
sched.recordSchedulingFailure(assumedPod, err, reason, err.Error())
바인딩 프로세스 수행 bind:
err := sched.bind(assumedPod, &v1.Binding{
ObjectMeta: metav1.ObjectMeta{Namespace: assumedPod.Namespace, Name: assumedPod.Name, UID: assumedPod.UID},
Target: v1.ObjectReference{
Kind: "Node",
Name: suggestedHost,
bind 메서드
// bind binds a pod to a given node defined in a binding object. We expect this to run asynchronously, so we
// handle binding metrics internally.
func (sched *Scheduler) bind(assumed *v1.Pod, b *v1.Binding) error {
bindingStart := time.Now()
// If binding succeeded then PodScheduled condition will be updated in apiserver so that
// it's atomic with setting host.
err := sched.config.GetBinder(assumed).Bind(b)
if finErr := sched.config.SchedulerCache.FinishBinding(assumed); finErr != nil {
klog.Errorf("scheduler cache FinishBinding failed: %v", finErr)
if err != nil {
klog.V(1).Infof("Failed to bind pod: %v/%v", assumed.Namespace, assumed.Name)
if err := sched.config.SchedulerCache.ForgetPod(assumed); err != nil {
klog.Errorf("scheduler cache ForgetPod failed: %v", err)
sched.recordSchedulingFailure(assumed, err, SchedulerError,
fmt.Sprintf("Binding rejected: %v", err))
return err
sched.config.Recorder.Eventf(assumed, v1.EventTypeNormal, "Scheduled", "Successfully assigned %v/%v to %v", assumed.Namespace, assumed.Name, b.Target.Name)
return nil
Bind 메서드
// Bind just does a POST binding RPC.
func (b *binder) Bind(binding *v1.Binding) error {
klog.V(3).Infof("Attempting to bind %v to %v", binding.Name, binding.Target.Name)
return b.Client.CoreV1().Pods(binding.Namespace).Bind(binding)
scheduler의 스케줄링 실행 프로세스 코드 위치\kubernetes\pkg\scheduler\scheduler.go
스케줄링을 기다리는 Pod 대상이 하나의 대기열SchedulingQueue,FIFO 또는PriorityQueue를 구성합니다
예약된 Node 객체 리스트
NodeLister algorithm.NodeLister
nodes, err := nodeLister.List()
구체적인 스케줄링 프로세스는 다음과 같습니다.
1. 엔트리 함수 func(sched *Scheduler) Run()
2. Pod func(sched *Scheduler) scheduleOne 스케줄링()
3. 예약할 Pod pod 가져오기: = sched.config.NextPod()
4. Pod 객체에 적합한 Node suggested Host를 할당합니다.err: = sched.schedule(pod)
5. 현재 Pod assumedPod 복사: = pod.DeepCopy()
6. 디렉토리 볼륨 allBound, err: = sched.assumeVolumes(assumedPod, suggestedHost)
7. 호스트 할당err = sched.assume(assumedPod, suggestedHost)
8. 디렉토리 볼륨 바인딩err: = sched.bindVolumes(assumedPod)
9. 호스트 바인딩func(sched *Scheduler) bind(assumed *v1.Pod, b *v1.Binding) error
핵심은 4단계:
func (sched *Scheduler) schedule(pod *v1.Pod) (string, error)
host, err := sched.config.Algorithm.Schedule(pod, sched.config.NodeLister)
그 중에서ScheduleAlgorithm에서Schedule(*v1.Pod,NodeLister)(selectedMachine string,err error) 방법은 플러그인식 알고리즘에 의해 정의되었다.
다음은kube-scheduler가 플러그인식 알고리즘ScheduleAlgorithm을 어떻게 주입하는지, 그리고 구체적인 schedule 방법의 집행 과정을 분석해 보겠습니다.
