ResourceBinding (RB) 组件详解
概述
ResourceBinding(RB)是 Karmada 中用于描述资源如何在多个成员集群中分布和调度的核心对象。本文档结合源码详细解释 RB 的完整生命周期和工作原理。
零、核心概念详解
在深入理解 RB 的工作原理之前,我们需要先理解 Karmada 中的几个核心概念。
0.1 PropagationPolicy(传播策略)
PropagationPolicy 是 Karmada 中用于定义资源如何传播到成员集群的策略对象。它类似于 Kubernetes 中的 ReplicaSet,但它控制的是资源在多个集群间的分布,而不是 Pod 在节点间的分布。
定义
- // PropagationPolicy represents the policy that propagates a group of resources to one or more clusters.
- type PropagationPolicy struct {
- metav1.TypeMeta `json:",inline"`
- metav1.ObjectMeta `json:"metadata,omitempty"`
- // Spec represents the desired behavior of PropagationPolicy.
- // +required
- Spec PropagationSpec `json:"spec"`
- }
复制代码 PropagationSpec 结构
- // PropagationSpec represents the desired behavior of PropagationPolicy.
- type PropagationSpec struct {
- // ResourceSelectors used to select resources.
- // Nil or empty selector is not allowed and doesn't mean match all kinds
- // of resources for security concerns that sensitive resources(like Secret)
- // might be accidentally propagated.
- // +required
- // +kubebuilder:validation:MinItems=1
- ResourceSelectors []ResourceSelector `json:"resourceSelectors"`
- // Association tells if relevant resources should be selected automatically.
- // e.g. a ConfigMap referred by a Deployment.
- // default false.
- // Deprecated: in favor of PropagateDeps.
- // +optional
- Association bool `json:"association,omitempty"`
- // PropagateDeps tells if relevant resources should be propagated automatically.
- // Take 'Deployment' which referencing 'ConfigMap' and 'Secret' as an example, when 'propagateDeps' is 'true',
- // the referencing resources could be omitted(for saving config effort) from 'resourceSelectors' as they will be
- // propagated along with the Deployment. In addition to the propagating process, the referencing resources will be
- // migrated along with the Deployment in the fail-over scenario.
- //
- // Defaults to false.
- // +optional
- PropagateDeps bool `json:"propagateDeps,omitempty"`
- // Placement represents the rule for select clusters to propagate resources.
- // +optional
- Placement Placement `json:"placement,omitempty"`
复制代码 关键字段说明:
- ResourceSelectors: 资源选择器,用于选择哪些资源需要传播
- 可以通过 APIVersion、Kind、Name、Namespace、LabelSelector 来选择资源
- 至少需要一个选择器(安全考虑,防止敏感资源被意外传播)
- PropagateDeps: 是否自动传播依赖资源
- 例如:Deployment 引用的 ConfigMap 和 Secret
- 设置为 true 时,可以省略这些依赖资源的选择器,它们会自动传播
- Placement: 集群选择规则(详见下文)
- Priority: 策略优先级(用于策略抢占)
- Failover: 故障转移行为(详见下文)
ResourceSelector 结构
- // ResourceSelector the resources will be selected.
- type ResourceSelector struct {
- // APIVersion represents the API version of the target resources.
- // +required
- APIVersion string `json:"apiVersion"`
- // Kind represents the Kind of the target resources.
- // +required
- Kind string `json:"kind"`
- // Namespace of the target resource.
- // Default is empty, which means inherit from the parent object scope.
- // +optional
- Namespace string `json:"namespace,omitempty"`
- // Name of the target resource.
- // Default is empty, which means selecting all resources.
- // +optional
- Name string `json:"name,omitempty"`
- // A label query over a set of resources.
- // If name is not empty, labelSelector will be ignored.
- // +optional
- LabelSelector *metav1.LabelSelector `json:"labelSelector,omitempty"`
- }
复制代码 ClusterPropagationPolicy
ClusterPropagationPolicy 与 PropagationPolicy 功能相同,但作用域是集群级别的(用于集群级别的资源,如 ClusterRole、ClusterRoleBinding 等)。
0.2 Placement(放置规则)
Placement 定义了资源应该被调度到哪些集群,以及如何在这些集群间分配。
定义
Placement 包含以下主要字段:
- ClusterAffinity: 集群亲和性(首选哪些集群)
- ClusterTolerations: 集群容忍度(可以调度到哪些集群)
- SpreadConstraints: 分散约束(如何在不同集群间分配)
- ReplicaScheduling: 副本调度策略(副本如何分配到集群)
示例:
- Duplicated: 所有集群都运行相同的副本数(如 ConfigMap、Secret)
- Divided: 副本按比例分配到不同集群(如 Deployment)
0.3 ResourceBinding(资源绑定)
ResourceBinding(RB) 是 PropagationPolicy 和资源对象的绑定结果。它记录了:
- 资源引用: 指向原始资源对象(Deployment、StatefulSet 等)
- 副本信息: 从资源中提取的副本数和资源需求
- 调度结果: 由 Scheduler 填充的目标集群列表
- 策略信息: 从 PropagationPolicy 复制的策略配置
ResourceBinding 与 PropagationPolicy 的关系
- 用户创建 Deployment + PropagationPolicy
- ↓
- ResourceDetector 检测匹配
- ↓
- 创建 ResourceBinding(绑定 Deployment 和 PropagationPolicy)
- ↓
- Scheduler 填充 ResourceBinding.Spec.Clusters(调度结果)
复制代码 重要:RB 是策略和资源的绑定,一个资源只会被一个策略绑定(按优先级选择)。
0.4 Work(工作负载)
Work 是实际被发送到成员集群的资源对象。一个 RB 可以生成多个 Work(每个目标集群一个)。
定义
- // Work defines a list of resources to be deployed on the member cluster.
- type Work struct {
- metav1.TypeMeta `json:",inline"`
- metav1.ObjectMeta `json:"metadata,omitempty"`
- // Spec represents the desired behavior of Work.
- Spec WorkSpec `json:"spec"`
- // Status represents the status of PropagationStatus.
- // +optional
- Status WorkStatus `json:"status,omitempty"`
- }
- // WorkSpec defines the desired state of Work.
- type WorkSpec struct {
- // Workload represents the manifest workload to be deployed on managed cluster.
- Workload WorkloadTemplate `json:"workload,omitempty"`
- // SuspendDispatching controls whether dispatching should
- // be suspended, nil means not suspend.
- // Note: true means stop propagating to the corresponding member cluster, and
- // does not prevent status collection.
- // +optional
- SuspendDispatching *bool `json:"suspendDispatching,omitempty"`
- // PreserveResourcesOnDeletion controls whether resources should be preserved on the
- // member cluster when the Work object is deleted.
- // If set to true, resources will be preserved on the member cluster.
- // Default is false, which means resources will be deleted along with the Work object.
- // +optional
- PreserveResourcesOnDeletion *bool `json:"preserveResourcesOnDeletion,omitempty"`
- }
复制代码 Work 与 ResourceBinding 的关系
- ResourceBinding (控制平面)
- ↓ 转换为
- Work (执行空间: execution-{cluster-name})
- ↓ 分发到
- 成员集群
- ↓ 执行
- 实际资源(Deployment、Service 等)
复制代码 执行空间(Execution Space):每个成员集群都有一个对应的 namespace,格式为 execution-{cluster-name},所有发送到该集群的 Work 都放在这个 namespace 中。
0.5 ResourceInterpreter(资源解释器)
ResourceInterpreter 是 Karmada 中用于解释自定义资源的核心组件。它能够理解不同种类的资源(Deployment、StatefulSet、自定义 CRD 等),并提取关键信息。
作用
ResourceInterpreter 提供以下能力:
- GetReplicas: 从资源中提取副本数和资源需求(用于 RB 创建时的性能瓶颈点)
- ReviseReplica: 修改资源的副本数(用于将调度结果应用到 Work)
- Retain: 保留集群特定的字段(防止覆盖)
- AggregateStatus: 聚合多个集群的状态
- InterpretHealth: 判断资源是否健康
- GetDependencies: 获取依赖资源
实现层次
- // ResourceInterpreter manages both default and customized webhooks to interpret custom resource structure.
- type ResourceInterpreter interface {
- // Start initializes the resource interpreter and performs cache synchronization.
- Start(ctx context.Context) (err error)
- // HookEnabled tells if any hook exist for specific resource type and operation.
- HookEnabled(objGVK schema.GroupVersionKind, operationType configv1alpha1.InterpreterOperation) bool
- // GetReplicas returns the desired replicas of the object as well as the requirements of each replica.
- GetReplicas(object *unstructured.Unstructured) (replica int32, replicaRequires *workv1alpha2.ReplicaRequirements, err error)
- // ReviseReplica revises the replica of the given object.
- ReviseReplica(object *unstructured.Unstructured, replica int64) (*unstructured.Unstructured, error)
- // GetComponents extracts the resource requirements for multiple components from the given object.
- // This hook is designed for CRDs with multiple components (e.g., FlinkDeployment), but can
- // also be used for single-component resources like Deployment.
- // If implemented, the controller will use this hook to obtain per-component replica and resource
- // requirements, and will not call GetReplicas.
- // If not implemented, the controller will fall back to GetReplicas for backward compatibility.
- // This hook will only be called when the feature gate 'MultiplePodTemplatesScheduling' is enabled.
- GetComponents(object *unstructured.Unstructured) (components []workv1alpha2.Component, err error)
- // Retain returns the objects that based on the "desired" object but with values retained from the "observed" object.
- Retain(desired *unstructured.Unstructured, observed *unstructured.Unstructured) (retained *unstructured.Unstructured, err error)
- // AggregateStatus returns the objects that based on the 'object' but with status aggregated.
- AggregateStatus(object *unstructured.Unstructured, aggregatedStatusItems []workv1alpha2.AggregatedStatusItem) (*unstructured.Unstructured, error)
- // GetDependencies returns the dependent resources of the given object.
复制代码 解释器类型
ResourceInterpreter 有四种实现方式(按优先级):
- ConfigurableInterpreter(最高优先级):使用 Lua 脚本进行声明式配置
- CustomizedInterpreter:使用 Webhook 进行自定义
- ThirdPartyInterpreter:第三方内置的解释规则
- DefaultInterpreter(最低优先级):Karmada 内置的默认解释器
调用顺序:- // GetReplicas returns the desired replicas of the object as well as the requirements of each replica.
- func (i *customResourceInterpreterImpl) GetReplicas(object *unstructured.Unstructured) (replica int32, requires *workv1alpha2.ReplicaRequirements, err error) {
- var hookEnabled bool
- replica, requires, hookEnabled, err = i.configurableInterpreter.GetReplicas(object)
- if err != nil {
- return
- }
- if hookEnabled {
- return
- }
- replica, requires, hookEnabled, err = i.customizedInterpreter.GetReplicas(context.TODO(), &request.Attributes{
- Operation: configv1alpha1.InterpreterOperationInterpretReplica,
- Object: object,
- })
- if err != nil {
- return
- }
- if hookEnabled {
- return
- }
- replica, requires, hookEnabled, err = i.thirdpartyInterpreter.GetReplicas(object)
- if err != nil {
- return
- }
- if hookEnabled {
- return
- }
- replica, requires, err = i.defaultInterpreter.GetReplicas(object)
- return
- }
复制代码 0.6 ResourceDetector(资源检测器)
ResourceDetector 是 Karmada 控制器的一部分,负责:
- 监听资源变化:监听 Kubernetes 资源(Deployment、Service 等)的创建/更新/删除
- 匹配策略:为资源找到匹配的 PropagationPolicy
- 创建 RB:根据策略创建或更新 ResourceBinding
- 策略管理:处理 PropagationPolicy 的生命周期
0.7 Scheduler(调度器)
Scheduler 是 Karmada 的调度组件,负责:
- 监听 RB:监听 ResourceBinding 的创建和更新
- 选择集群:根据 Placement 规则选择合适的成员集群
- 分配副本:将副本按策略分配到不同集群
- 更新 RB:将调度结果填充到 ResourceBinding.Spec.Clusters
重要:Scheduler 只负责填充 Spec.Clusters,不会修改其他字段。
0.8 概念关系图
- ┌─────────────────────────────────────────────────────────┐
- │ 控制平面(Karmada) │
- │ │
- │ ┌─────────────┐ ┌──────────────┐ │
- │ │ Deployment │ │Propagation │ │
- │ │ (资源对象) │ │ Policy │ │
- │ └──────┬──────┘ └──────┬───────┘ │
- │ │ │ │
- │ └────────┬───────────┘ │
- │ │ │
- │ ▼ │
- │ ┌──────────────────┐ │
- │ │ResourceDetector │ │
- │ │ (检测并创建) │ │
- │ └────────┬─────────┘ │
- │ │ │
- │ ▼ │
- │ ┌──────────────────┐ │
- │ │ResourceBinding │ │
- │ │ (绑定策略和资源) │ │
- │ └────────┬─────────┘ │
- │ │ │
- │ ▼ │
- │ ┌──────────────────┐ │
- │ │ Scheduler │ │
- │ │ (选择目标集群) │ │
- │ └────────┬─────────┘ │
- │ │ │
- │ ▼ │
- │ ┌──────────────────┐ │
- │ │ResourceBinding │ │
- │ │ (Spec.Clusters已 │ │
- │ │ 被填充) │ │
- │ └────────┬─────────┘ │
- │ │ │
- │ ▼ │
- │ ┌──────────────────┐ │
- │ │ BindingController│ │
- │ │ (转换为Work) │ │
- │ └────────┬─────────┘ │
- │ │ │
- └──────────────────┼─────────────────────────────────────┘
- │
- ┌──────────┴──────────┐
- │ │
- ▼ ▼
- ┌──────────────┐ ┌──────────────┐
- │ Work │ │ Work │
- │(execution- │ │(execution- │
- │ cluster-1) │ │ cluster-2) │
- └──────┬───────┘ └──────┬───────┘
- │ │
- └──────────┬──────────┘
- │
- ┌─────────┴─────────┐
- │ │
- ▼ ▼
- ┌─────────────┐ ┌─────────────┐
- │成员集群1 │ │成员集群2 │
- │ │ │ │
- │ Deployment │ │ Deployment │
- │ (实际运行) │ │ (实际运行) │
- └─────────────┘ └─────────────┘
复制代码 一、RB 的生命周期
1.1 整体流程
- 用户创建资源 + PropagationPolicy
- ↓
- ResourceDetector 检测并创建/更新 RB
- ↓
- Scheduler 为 RB 选择目标集群
- ↓
- ResourceBindingController 将 RB 转换为 Work
- ↓
- Work 被分发到成员集群
复制代码 1.2 关键组件
- ResourceDetector (pkg/detector/detector.go): 负责检测资源并创建 RB
- Scheduler (pkg/scheduler/): 为 RB 选择目标集群
- ResourceBindingController (pkg/controllers/binding/): 将 RB 转换为 Work
- ResourceInterpreter (pkg/resourceinterpreter/): 解释资源,提取副本数和资源需求
二、RB 的创建过程
2.1 触发条件
当用户创建或更新了:
- 资源对象(如 Deployment)
- PropagationPolicy 或 ClusterPropagationPolicy
ResourceDetector 会检测到变化并触发 RB 的创建或更新。
2.2 创建流程详解
步骤1: 检测资源和策略匹配
当资源对象发生变化时,ResourceDetector 会:- // BuildResourceBinding builds a desired ResourceBinding for object.
- func (d *ResourceDetector) BuildResourceBinding(object *unstructured.Unstructured, policySpec *policyv1alpha1.PropagationSpec, policyID string, policyMeta metav1.ObjectMeta, claimFunc func(object metav1.Object, policyId string, objectMeta metav1.ObjectMeta)) (*workv1alpha2.ResourceBinding, error) {
- bindingName := names.GenerateBindingName(object.GetKind(), object.GetName())
- propagationBinding := &workv1alpha2.ResourceBinding{
- ObjectMeta: metav1.ObjectMeta{
- Name: bindingName,
- Namespace: object.GetNamespace(),
- OwnerReferences: []metav1.OwnerReference{
- *metav1.NewControllerRef(object, object.GroupVersionKind()),
- },
- Finalizers: []string{util.BindingControllerFinalizer},
- },
- Spec: workv1alpha2.ResourceBindingSpec{
- PropagateDeps: policySpec.PropagateDeps,
- SchedulerName: policySpec.SchedulerName,
- Placement: &policySpec.Placement,
- Failover: policySpec.Failover,
- ConflictResolution: policySpec.ConflictResolution,
- PreserveResourcesOnDeletion: policySpec.PreserveResourcesOnDeletion,
- Resource: workv1alpha2.ObjectReference{
- APIVersion: object.GetAPIVersion(),
- Kind: object.GetKind(),
- Namespace: object.GetNamespace(),
- Name: object.GetName(),
- UID: object.GetUID(),
- ResourceVersion: object.GetResourceVersion(),
- },
- },
- }
- if policySpec.Suspension != nil {
- propagationBinding.Spec.Suspension = &workv1alpha2.Suspension{Suspension: *policySpec.Suspension}
- }
- claimFunc(propagationBinding, policyID, policyMeta)
- if err := d.applyReplicaInterpretation(object, &propagationBinding.Spec); err != nil {
- return nil, err
- }
- if features.FeatureGate.Enabled(features.PriorityBasedScheduling) && policySpec.SchedulePriority != nil {
- // ... 处理调度优先级
- }
- return propagationBinding, nil
- }
复制代码 关键点:
- 使用 names.GenerateBindingName() 生成 RB 名称(格式:{Kind}-{Name})
- 设置 OwnerReference 建立资源对象和 RB 的关联
- 添加 Finalizer 确保删除时正确清理
步骤2: 应用副本解释
这是 RB 创建中的关键步骤- // applyReplicaInterpretation handles the logic for interpreting replicas or components from an object.
- func (d *ResourceDetector) applyReplicaInterpretation(object *unstructured.Unstructured, spec *workv1alpha2.ResourceBindingSpec) error {
- gvk := object.GroupVersionKind()
- name := object.GetName()
- // Prioritize component interpretation if the feature and GetComponents are enabled.
- if features.FeatureGate.Enabled(features.MultiplePodTemplatesScheduling) && d.ResourceInterpreter.HookEnabled(gvk, configv1alpha1.InterpreterOperationInterpretComponent) {
- components, err := d.ResourceInterpreter.GetComponents(object)
- if err != nil {
- klog.Errorf("Failed to get components for %s(%s): %v", gvk, name, err)
- return err
- }
- spec.Components = components
- return nil
- }
- // GetReplicas is executed if the MultiplePodTemplatesScheduling feature gate is disabled, or if GetComponents is not implemented.
- if d.ResourceInterpreter.HookEnabled(gvk, configv1alpha1.InterpreterOperationInterpretReplica) {
- replicas, replicaRequirements, err := d.ResourceInterpreter.GetReplicas(object)
- if err != nil {
- klog.Errorf("Failed to customize replicas for %s(%s): %v", gvk, name, err)
- return err
- }
- spec.Replicas = replicas
- spec.ReplicaRequirements = replicaRequirements
- }
- return nil
- }
复制代码 调用链:- applyReplicaInterpretation
- ↓
- ResourceInterpreter.GetReplicas(object)
- ↓
- ConfigurableInterpreter.GetReplicas(object)
- ↓
- LuaVM.GetReplicas(object, script)
- ↓
- LuaVM.RunScript(script, "GetReplicas", 2, object) ← 需要从 VM 池获取实例
- ↓
- VM.Pool.Get() ← 锁竞争点
- ↓
- Lua.DoString(script) ← 脚本编译点
- ↓
- Lua.CallByParam(...) ← 执行 GetReplicas 函数
复制代码 步骤3: 创建或更新 RB
使用 CreateOrUpdate 确保 RB 存在:- binding, err := d.BuildResourceBinding(object, &policy.Spec, policyID, policy.ObjectMeta, AddPPClaimMetadata)
- if err != nil {
- klog.Errorf("Failed to build resourceBinding for object: %s. error: %v", objectKey, err)
- return err
- }
- bindingCopy := binding.DeepCopy()
- err = retry.RetryOnConflict(retry.DefaultRetry, func() (err error) {
- operationResult, err = controllerutil.CreateOrUpdate(context.TODO(), d.Client, bindingCopy, func() error {
- // If this binding exists and its owner is not the input object, return error and let garbage collector
- // delete this binding and try again later. See https://github.com/karmada-io/karmada/issues/2090.
- if ownerRef := metav1.GetControllerOfNoCopy(bindingCopy); ownerRef != nil && ownerRef.UID != object.GetUID() {
- return fmt.Errorf("failed to update binding due to different owner reference UID, will " +
- "try again later after binding is garbage collected, see https://github.com/karmada-io/karmada/issues/2090")
- }
- // Just update necessary fields, especially avoid modifying Spec.Clusters which is scheduling result, if already exists.
- bindingCopy.Annotations = util.DedupeAndMergeAnnotations(bindingCopy.Annotations, binding.Annotations)
- bindingCopy.Labels = util.DedupeAndMergeLabels(bindingCopy.Labels, binding.Labels)
- bindingCopy.OwnerReferences = binding.OwnerReferences
- bindingCopy.Spec.Placement = binding.Spec.Placement
- bindingCopy.Spec.Resource = binding.Spec.Resource
- bindingCopy.Spec.ConflictResolution = binding.Spec.ConflictResolution
- if binding.Spec.Suspension != nil {
- if bindingCopy.Spec.Suspension == nil {
- bindingCopy.Spec.Suspension = &workv1alpha2.Suspension{}
- }
- bindingCopy.Spec.Suspension.Suspension = binding.Spec.Suspension.Suspension
- }
- return nil
- })
- if err != nil {
- return err
- }
- return nil
- })
复制代码 关键点:
- 使用 RetryOnConflict 处理并发更新冲突
- 不修改 Spec.Clusters,这是调度器的调度结果
- 只更新策略相关的字段
三、RB 到 Work 的转换
3.1 转换触发
当 RB 被创建或更新后,ResourceBindingController 会监听 RB 的变化:- // syncBinding will sync resourceBinding to Works.
- func (c *ResourceBindingController) syncBinding(ctx context.Context, binding *workv1alpha2.ResourceBinding) (controllerruntime.Result, error) {
- if err := c.removeOrphanWorks(ctx, binding); err != nil {
- return controllerruntime.Result{}, err
- }
- needWaitForCleanup, err := c.checkDirectPurgeOrphanWorks(ctx, binding)
- if err != nil {
- return controllerruntime.Result{}, err
- }
- if needWaitForCleanup {
- msg := fmt.Sprintf("There are works in clusters with PurgeMode 'Directly' not deleted for ResourceBinding(%s/%s), skip syncing works",
- binding.Namespace, binding.Name)
- klog.V(4).InfoS(msg, "namespace", binding.GetNamespace(), "binding", binding.GetName())
- return controllerruntime.Result{RequeueAfter: requeueIntervalForDirectlyPurge}, nil
- }
- workload, err := helper.FetchResourceTemplate(ctx, c.DynamicClient, c.InformerManager, c.RESTMapper, binding.Spec.Resource)
- if err != nil {
- if apierrors.IsNotFound(err) {
- // It might happen when the resource template has been removed but the garbage collector hasn't removed
- // the ResourceBinding which dependent on resource template.
- // So, just return without retry(requeue) would save unnecessary loop.
- return controllerruntime.Result{}, nil
- }
- klog.ErrorS(err, "Failed to fetch workload for ResourceBinding", "namespace", binding.GetNamespace(), "binding", binding.GetName())
- return controllerruntime.Result{}, err
- }
- start := time.Now()
- err = ensureWork(ctx, c.Client, c.ResourceInterpreter, workload, c.OverrideManager, binding, apiextensionsv1.NamespaceScoped)
- metrics.ObserveSyncWorkLatency(err, start)
- if err != nil {
- klog.ErrorS(err, "Failed to transform ResourceBinding to works", "namespace", binding.GetNamespace(), "binding", binding.GetName())
- c.EventRecorder.Event(binding, corev1.EventTypeWarning, events.EventReasonSyncWorkFailed, err.Error())
- c.EventRecorder.Event(workload, corev1.EventTypeWarning, events.EventReasonSyncWorkFailed, err.Error())
- return controllerruntime.Result{}, err
- }
- msg := fmt.Sprintf("Sync work of ResourceBinding(%s/%s) successful.",
复制代码 3.2 ensureWork 函数详解
这是将 RB 转换为 Work 的核心函数:- // ensureWork ensure Work to be created or updated.
- func ensureWork(
- ctx context.Context, c client.Client, resourceInterpreter resourceinterpreter.ResourceInterpreter, workload *unstructured.Unstructured,
- overrideManager overridemanager.OverrideManager, binding metav1.Object, scope apiextensionsv1.ResourceScope,
- ) error {
- bindingSpec := getBindingSpec(binding, scope)
- targetClusters := mergeTargetClusters(bindingSpec.Clusters, bindingSpec.RequiredBy)
- var err error
- var errs []error
- var jobCompletions []workv1alpha2.TargetCluster
- if workload.GetKind() == util.JobKind && needReviseJobCompletions(bindingSpec.Replicas, bindingSpec.Placement) {
- jobCompletions, err = divideReplicasByJobCompletions(workload, targetClusters)
- if err != nil {
- return err
- }
- }
- for i := range targetClusters {
- targetCluster := targetClusters[i]
- clonedWorkload := workload.DeepCopy()
- workNamespace := names.GenerateExecutionSpaceName(targetCluster.Name)
- // When syncing workloads to member clusters, the controller MUST strictly adhere to the scheduling results
- // specified in bindingSpec.Clusters for replica allocation, rather than using the replicas declared in the
- // workload's resource template.
- // This rule applies regardless of whether the workload distribution mode is "Divided" or "Duplicated".
- // Failing to do so could allow workloads to bypass the quota checks performed by the scheduler
- // (especially during scale-up operations) or skip queue validation when scheduling is suspended.
- if bindingSpec.IsWorkload() {
- if resourceInterpreter.HookEnabled(clonedWorkload.GroupVersionKind(), configv1alpha1.InterpreterOperationReviseReplica) {
- clonedWorkload, err = resourceInterpreter.ReviseReplica(clonedWorkload, int64(targetCluster.Replicas))
- if err != nil {
- klog.ErrorS(err, "Failed to revise replica for workload in cluster.", "workloadKind", workload.GetKind(),
- "workloadNamespace", workload.GetNamespace(), "workloadName", workload.GetName(), "cluster", targetCluster.Name)
- errs = append(errs, err)
- continue
- }
- }
- }
- // jobSpec.Completions specifies the desired number of successfully finished pods the job should be run with.
- // When the replica scheduling policy is set to "divided", jobSpec.Completions should also be divided accordingly.
- // The weight assigned to each cluster roughly equals that cluster's jobSpec.Parallelism value. This approach helps
- // balance the execution time of the job across member clusters.
- if len(jobCompletions) > 0 {
- // Set allocated completions for Job only when the '.spec.completions' field not omitted from resource template.
- // For jobs running with a 'work queue' usually leaves '.spec.completions' unset, in that case we skip
- // setting this field as well.
- // Refer to: https://kubernetes.io/docs/concepts/workloads/controllers/job/#parallel-jobs.
- if err = helper.ApplyReplica(clonedWorkload, int64(jobCompletions[i].Replicas), util.CompletionsField); err != nil {
- klog.ErrorS(err, "Failed to apply Completions for workload in cluster.",
- "workloadKind", clonedWorkload.GetKind(), "workloadNamespace", clonedWorkload.GetNamespace(),
- "workloadName", clonedWorkload.GetName(), "cluster", targetCluster.Name)
- errs = append(errs, err)
- continue
- }
- }
- // We should call ApplyOverridePolicies last, as override rules have the highest priority
- cops, ops, err := overrideManager.ApplyOverridePolicies(clonedWorkload, targetCluster.Name)
- if err != nil {
- klog.ErrorS(err, "Failed to apply overrides for workload in cluster.",
- "workloadKind", clonedWorkload.GetKind(), "workloadNamespace", clonedWorkload.GetNamespace(),
- "workloadName", clonedWorkload.GetName(), "cluster", targetCluster.Name)
- errs = append(errs, err)
- continue
- }
- workLabel := mergeLabel(clonedWorkload, binding, scope)
- annotations := mergeAnnotations(clonedWorkload, binding, scope)
- annotations = mergeConflictResolution(clonedWorkload, bindingSpec.ConflictResolution, annotations)
- annotations, err = RecordAppliedOverrides(cops, ops, annotations)
- if err != nil {
- klog.ErrorS(err, "Failed to record appliedOverrides in cluster.", "cluster", targetCluster.Name)
- errs = append(errs, err)
- continue
- }
- if features.FeatureGate.Enabled(features.StatefulFailoverInjection) {
- // we need to figure out if the targetCluster is in the cluster we are going to migrate application to.
- // If yes, we have to inject the preserved label state to the clonedWorkload.
- clonedWorkload = injectReservedLabelState(bindingSpec, targetCluster, clonedWorkload, len(targetClusters))
- }
- workMeta := metav1.ObjectMeta{
- Name: names.GenerateWorkName(clonedWorkload.GetKind(), clonedWorkload.GetName(), clonedWorkload.GetNamespace()),
- Namespace: workNamespace,
- Finalizers: []string{util.ExecutionControllerFinalizer},
- Labels: workLabel,
- Annotations: annotations,
- }
- if err = ctrlutil.CreateOrUpdateWork(
- ctx,
- c,
- workMeta,
- clonedWorkload,
- ctrlutil.WithSuspendDispatching(shouldSuspendDispatching(bindingSpec.Suspension, targetCluster)),
- ctrlutil.WithPreserveResourcesOnDeletion(ptr.Deref(bindingSpec.PreserveResourcesOnDeletion, false)),
- ); err != nil {
- errs = append(errs, err)
- continue
- }
- }
- return errors.NewAggregate(errs)
- }
复制代码 关键步骤:
- 获取目标集群列表:从 bindingSpec.Clusters 中获取调度结果
- 为每个集群创建 Work:
- 克隆 workload
- 根据调度结果调整副本数(ReviseReplica)
- 应用 OverridePolicies
- 创建 Work 对象
四、RB 的数据结构
4.1 ResourceBindingSpec
- type ResourceBindingSpec struct {
- // 资源引用
- Resource workv1alpha2.ObjectReference
-
- // 副本信息(从 ResourceInterpreter 获取)
- Replicas *int32
- ReplicaRequirements *ReplicaRequirements
- Components []Component
-
- // 调度相关
- Placement *Placement
- Clusters []TargetCluster // 调度结果,由 Scheduler 填充
- SchedulerName string
-
- // 其他
- PropagateDeps bool
- Failover *FailoverBehavior
- ConflictResolution ConflictResolution
- // ...
- }
复制代码 4.2 关键字段说明
- Resource: 指向原始资源对象(Deployment、StatefulSet 等)
- Replicas/ReplicaRequirements: 从资源中提取的副本数和资源需求
- Clusters: 由 Scheduler 填充,包含目标集群和分配的副本数
- Placement: 调度策略(从哪里调度)
五、关键概念快速索引
5.1 核心资源对象
概念定义作用域关键字段PropagationPolicy定义资源如何传播的策略NamespaceResourceSelectors, Placement, PriorityClusterPropagationPolicy集群级别的传播策略Cluster同 PropagationPolicyResourceBinding资源对象和策略的绑定NamespaceResource, Replicas, ClustersClusterResourceBinding集群级别的资源绑定Cluster同 ResourceBindingWork实际发送到成员集群的工作负载NamespaceWorkload.Manifests, Status5.2 核心组件
组件职责关键操作ResourceDetector检测资源并创建 RBApplyPolicy, BuildResourceBindingScheduler为 RB 选择目标集群填充 Spec.ClustersResourceBindingController将 RB 转换为 WorksyncBinding, ensureWorkResourceInterpreter解释资源结构GetReplicas, ReviseReplicaExecutionController在成员集群执行 WorksyncToClusters5.3 关键数据结构
Placement(放置规则)
- type Placement struct {
- // ClusterAffinity represents scheduling restrictions to a certain set of clusters.
- // Note:
- // 1. ClusterAffinity can not co-exist with ClusterAffinities.
- // 2. If both ClusterAffinity and ClusterAffinities are not set, any cluster
- // can be scheduling candidates.
- // +optional
- ClusterAffinity *ClusterAffinity `json:"clusterAffinity,omitempty"`
- // ClusterAffinities represents scheduling restrictions to multiple cluster
- // groups that indicated by ClusterAffinityTerm.
- //
- // The scheduler will evaluate these groups one by one in the order they
- // appear in the spec, the group that does not satisfy scheduling restrictions
- // will be ignored which means all clusters in this group will not be selected
- // unless it also belongs to the next group(a cluster could belong to multiple
- // groups).
- //
- // If none of the groups satisfy the scheduling restrictions, then scheduling
- // fails, which means no cluster will be selected.
- //
- // Note:
- // 1. ClusterAffinities can not co-exist with ClusterAffinity.
- // 2. If both ClusterAffinity and ClusterAffinities are not set, any cluster
- // can be scheduling candidates.
- //
- // Potential use case 1:
- // The private clusters in the local data center could be the main group, and
- // the managed clusters provided by cluster providers could be the secondary
- // group. So that the Karmada scheduler would prefer to schedule workloads
- // to the main group and the second group will only be considered in case of
- // the main group does not satisfy restrictions(like, lack of resources).
- //
- // Potential use case 2:
- // For the disaster recovery scenario, the clusters could be organized to
- // primary and backup groups, the workloads would be scheduled to primary
- // clusters firstly, and when primary cluster fails(like data center power off),
- // Karmada scheduler could migrate workloads to the backup clusters.
- //
- // +optional
- ClusterAffinities []ClusterAffinityTerm `json:"clusterAffinities,omitempty"`
- // ClusterTolerations represents the tolerations.
- // +optional
- ClusterTolerations []corev1.Toleration `json:"clusterTolerations,omitempty"`
- // SpreadConstraints represents a list of the scheduling constraints.
- // +optional
- SpreadConstraints []SpreadConstraint `json:"spreadConstraints,omitempty"`
- // ReplicaScheduling represents the scheduling policy on dealing with the number of replicas
- // when propagating resources that have replicas in spec (e.g. deployments, statefulsets) to member clusters.
- // +optional
- ReplicaScheduling *ReplicaSchedulingStrategy `json:"replicaScheduling,omitempty"`
- }
复制代码 字段说明:
- ClusterAffinity: 首选哪些集群(通过标签、字段、集群名选择)
- ClusterTolerations: 集群容忍度(类似 Pod 的 Tolerations)
- SpreadConstraints: 分散约束(如:最多 3 个集群,每个集群至少 1 个副本)
- ReplicaScheduling: 副本调度策略(Duplicated 或 Divided)
ResourceBindingSpec
RB 的 Spec 包含:
- Resource: 资源引用(APIVersion, Kind, Name, Namespace, UID)
- Replicas: 副本数(从 ResourceInterpreter 获取)
- ReplicaRequirements: 资源需求(CPU、内存等)
- Clusters: 目标集群列表(由 Scheduler 填充)
- Placement: 放置规则(从 PropagationPolicy 复制)
- Failover: 故障转移行为
重要:Spec.Clusters 是调度结果,ResourceDetector 和 BindingController 都不会修改它。
5.4 执行空间(Execution Space)
每个成员集群都有一个对应的 namespace:
- 命名规则: execution-{cluster-name}
- 作用: 存放发送到该集群的所有 Work
- 示例:
- 集群 member1 → namespace execution-member1
- 集群 member2 → namespace execution-member2
5.5 故障转移(Failover)
FailoverBehavior 定义了在应用或集群故障时的行为:- // FailoverBehavior indicates failover behaviors in case of an application or
- // cluster failure.
- type FailoverBehavior struct {
- // Application indicates failover behaviors in case of application failure.
- // If this value is nil, failover is disabled.
- // If set, the PropagateDeps should be true so that the dependencies could
- // be migrated along with the application.
- // +optional
- Application *ApplicationFailoverBehavior `json:"application,omitempty"`
- // Cluster indicates failover behaviors in case of cluster failure.
- // If this value is nil, the failover behavior in case of cluster failure
- // will be controlled by the controller's no-execute-taint-eviction-purge-mode
- // parameter.
- // If set, the failover behavior in case of cluster failure will be defined
- // by this value.
- // +optional
- Cluster *ClusterFailoverBehavior `json:"cluster,omitempty"`
- }
- // ApplicationFailoverBehavior indicates application failover behaviors.
- type ApplicationFailoverBehavior struct {
- // DecisionConditions indicates the decision conditions of performing the failover process.
- // Only when all conditions are met can the failover process be performed.
- // Currently, DecisionConditions includes several conditions:
- // - TolerationSeconds (optional)
- // +required
- DecisionConditions DecisionConditions `json:"decisionConditions"`
- // PurgeMode represents how to deal with the legacy applications on the
- // cluster from which the application is migrated.
- // Valid options are "Directly", "Gracefully", "Never", "Immediately"(deprecated),
- // and "Graciously"(deprecated).
- // Defaults to "Gracefully".
- // +kubebuilder:validation:Enum=Directly;Gracefully;Never;Immediately;Graciously
- // +kubebuilder:default=Gracefully
- // +optional
- PurgeMode PurgeMode `json:"purgeMode,omitempty"`
复制代码 PurgeMode 说明:
- Directly: 立即删除旧集群上的应用(用于不能容忍两个实例同时运行的应用,如 Flink)
- Gracefully: 等待新集群上的应用健康后再删除(默认)
- Never: 不删除,手动清理
六、总结
6.1 关键流程回顾
- 资源创建 → ResourceDetector 检测
- 策略匹配 → 找到匹配的 PropagationPolicy
- 创建 RB → BuildResourceBinding(性能瓶颈:Lua 脚本执行)
- 调度 → Scheduler 填充 Spec.Clusters
- 转换 Work → ResourceBindingController 将 RB 转换为 Work
- 分发执行 → Work 发送到成员集群执行
来源:程序园用户自行投稿发布,如果侵权,请联系站长删除
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作! |