// 在创建scheduler的函数 runcommand()
func Setup() {
// 创建scheduler,包括多个选项
sched, err := scheduler.New(cc.Client,
cc.InformerFactory,
cc.PodInformer,
recorderFactory,
ctx.Done(),
scheduler.WithProfiles(cc.ComponentConfig.Profiles…),
scheduler.WithAlgorithmSource(cc.ComponentConfig.AlgorithmSource),
scheduler.WithPercentageOfNodesToScore(cc.ComponentConfig.PercentageOfNodesToScore),
scheduler.WithFrameworkOutOfTreeRegistry(outOfTreeRegistry),
scheduler.WithPodMaxBackoffSeconds(cc.ComponentConfig.PodMaxBackoffSeconds),
scheduler.WithPodInitialBackoffSeconds(cc.ComponentConfig.PodInitialBackoffSeconds),
scheduler.WithExtenders(cc.ComponentConfig.Extenders…),
)
return &cc, sched, nil
}
// 我们再看一下New这个函数
func New() (*Scheduler, error) {
// 先注册了所有的算法,保存到一个 map[string]PluginFactory 中
registry := frameworkplugins.NewInTreeRegistry()
//NewInTreeRegistry里面的一些调度插件
/* return runtime.Registry{
selectorspread.Name: selectorspread.New,
imagelocality.Name: imagelocality.New,
tainttoleration.Name: tainttoleration.New,
nodename.Name: nodename.New,
nodeports.Name: nodeports.New,
nodeaffinity.Name: nodeaffinity.New,
podtopologyspread.Name: runtime.FactoryAdapter(fts, podtopologyspread.New),
…
*/
// 重点看一下Scheduler的创建过程
var sched *Scheduler
source := options.schedulerAlgorithmSource
switch {
// 根据Provider创建,重点看这里
case source.Provider != nil:
sc, err := configurator.createFromProvider(*source.Provider)
if err != nil {
return nil, fmt.Errorf(“couldnt create scheduler using provider %q: %v”, *source.Provider, err)
}
sched = sc
// 根据用户设置创建,来自文件或者ConfigMap
case source.Policy != nil:
policy := &schedulerapi.Policy{}
switch {
case source.Policy.File != nil:
if err := initPolicyFromFile(source.Policy.File.Path, policy); err != nil {
return nil, err
}
case source.Policy.ConfigMap != nil:
if err := initPolicyFromConfigMap(client, source.Policy.ConfigMap, policy); err != nil {
return nil, err
}
}
configurator.extenders = policy.Extenders
sc, err := configurator.createFromConfig(*policy)
if err != nil {
return nil, fmt.Errorf(“couldnt create scheduler from policy: %v”, err)
}
sched = sc
default:
return nil, fmt.Errorf(“unsupported algorithm source: %v”, source)
}
}
// 创建
func (c *Configurator) createFromProvider(providerName string) (*Scheduler, error) {
klog.V(2).Infof(“Creating scheduler from algorithm provider %v”, providerName)
// 实例化算法的Registry
r := algorithmprovider.NewRegistry()
defaultPlugins, exist := r[providerName]
if !exist {
return nil, fmt.Errorf(“algorithm provider %q is not registered”, providerName)
}
// 将各种算法作为plugin进行设置
for i := range c.profiles {
prof := &c.profiles[i]
plugins := &schedulerapi.Plugins{}
plugins.Append(defaultPlugins)
plugins.Apply(prof.Plugins)
prof.Plugins = plugins
}
return c.create()
}
// 从这个初始化中可以看到,主要分为2类:默认与ClusterAutoscaler两种算法
func NewRegistry() Registry {
// 默认算法包括过滤、打分、绑定等,有兴趣的去源码中逐个阅读
defaultConfig := getDefaultConfig()
applyFeatureGates(defaultConfig)
// ClusterAutoscaler 是集群自动扩展的算法,被单独拎出来
caConfig := getClusterAutoscalerConfig()
applyFeatureGates(caConfig)
return Registry{
schedulerapi.SchedulerDefaultProviderName: defaultConfig,
ClusterAutoscalerProvider: caConfig,
}
}
/*
在这里,熟悉k8s的朋友会有个疑问:以前听说kubernets的调度有个Predicate和Priority两个算法,这里怎么没有分类?
这个疑问,我们在后面具体场景时再进行分析。
在新的版本中,这部分代码逻辑是由拓展buildExtenders和nodelist,podQueue,维护了一个调度队列,其余都是与上面差别不大的
*/