funccreateCloudProvider(cloudProvider string, externalCloudVolumePlugin string, cloudConfigFile string, allowUntaggedCloud bool, sharedInformers informers.SharedInformerFactory) (cloudprovider.Interface, ControllerLoopMode, error) { var cloud cloudprovider.Interface var loopMode ControllerLoopMode var err error if utilfeature.DefaultFeatureGate.Enabled(features.DisableCloudProviders) && cloudprovider.IsDeprecatedInternal(cloudProvider) { cloudprovider.DisableWarningForProvider(cloudProvider) returnnil, ExternalLoops, fmt.Errorf( "cloud provider %q was specified, but built-in cloud providers are disabled. Please set --cloud-provider=external and migrate to an external cloud provider", cloudProvider) } // 判断是否是external参数 if cloudprovider.IsExternal(cloudProvider) { loopMode = ExternalLoops if externalCloudVolumePlugin == "" { // externalCloudVolumePlugin is temporary until we split all cloud providers out. // So we just tell the caller that we need to run ExternalLoops without any cloud provider. returnnil, loopMode, nil } cloud, err = cloudprovider.InitCloudProvider(externalCloudVolumePlugin, cloudConfigFile) } else { // 输出弃用信息 cloudprovider.DeprecationWarningForProvider(cloudProvider) loopMode = IncludeCloudLoops // 初始化相应的云厂商provider cloud, err = cloudprovider.InitCloudProvider(cloudProvider, cloudConfigFile) } if err != nil { returnnil, loopMode, fmt.Errorf("cloud provider could not be initialized: %v", err) } if cloud != nil && !cloud.HasClusterID() { if allowUntaggedCloud { klog.Warning("detected a cluster without a ClusterID. A ClusterID will be required in the future. Please tag your cluster to avoid any future issues") } else { returnnil, loopMode, fmt.Errorf("no ClusterID Found. A ClusterID is required for the cloud provider to function properly. This check can be bypassed by setting the allow-untagged-cloud option") } } // 设置Informoer if informerUserCloud, ok := cloud.(cloudprovider.InformerUser); ok { informerUserCloud.SetInformers(sharedInformers) } return cloud, loopMode, err }
registerMetrics() s := &Controller{ cloud: cloud, knownHosts: []*v1.Node{}, kubeClient: kubeClient, clusterName: clusterName, cache: &serviceCache{serviceMap: make(map[string]*cachedService)}, eventBroadcaster: broadcaster, eventRecorder: recorder, nodeLister: nodeInformer.Lister(), nodeListerSynced: nodeInformer.Informer().HasSynced, queue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(minRetryDelay, maxRetryDelay), "service"), // nodeSyncCh has a size 1 buffer. Only one pending sync signal would be cached. nodeSyncCh: make(chaninterface{}, 1), }
serviceInformer.Informer().AddEventHandlerWithResyncPeriod( cache.ResourceEventHandlerFuncs{ // 添加事件回调函数 AddFunc: func(cur interface{}) { svc, ok := cur.(*v1.Service) // Check cleanup here can provide a remedy when controller failed to handle // changes before it exiting (e.g. crashing, restart, etc.). if ok && (wantsLoadBalancer(svc) || needsCleanup(svc)) { s.enqueueService(cur) } }, // 更新事件回调函数 UpdateFunc: func(old, cur interface{}) { oldSvc, ok1 := old.(*v1.Service) curSvc, ok2 := cur.(*v1.Service) if ok1 && ok2 && (s.needsUpdate(oldSvc, curSvc) || needsCleanup(curSvc)) { s.enqueueService(cur) } }, // No need to handle deletion event because the deletion would be handled by // the update path when the deletion timestamp is added. }, serviceSyncPeriod, ) s.serviceLister = serviceInformer.Lister() s.serviceListerSynced = serviceInformer.Informer().HasSynced
...
if err := s.init(); err != nil { returnnil, err }
return s, nil }
// LoadBalance类型 funcwantsLoadBalancer(service *v1.Service)bool { // if LoadBalancerClass is set, the user does not want the default cloud-provider Load Balancer return service.Spec.Type == v1.ServiceTypeLoadBalancer && service.Spec.LoadBalancerClass == nil }