找回密码
 立即注册
首页 业界区 业界 karmada-ResourceBinding (RB) 工作原理详解

karmada-ResourceBinding (RB) 工作原理详解

啸妹回 2026-1-14 04:10:01
ResourceBinding (RB) 组件详解

概述

ResourceBinding(RB)是 Karmada 中用于描述资源如何在多个成员集群中分布和调度的核心对象。本文档结合源码详细解释 RB 的完整生命周期和工作原理。
零、核心概念详解

在深入理解 RB 的工作原理之前,我们需要先理解 Karmada 中的几个核心概念。
0.1 PropagationPolicy(传播策略)

PropagationPolicy 是 Karmada 中用于定义资源如何传播到成员集群的策略对象。它类似于 Kubernetes 中的 ReplicaSet,但它控制的是资源在多个集群间的分布,而不是 Pod 在节点间的分布。
定义
  1. // PropagationPolicy represents the policy that propagates a group of resources to one or more clusters.
  2. type PropagationPolicy struct {
  3.         metav1.TypeMeta   `json:",inline"`
  4.         metav1.ObjectMeta `json:"metadata,omitempty"`
  5.         // Spec represents the desired behavior of PropagationPolicy.
  6.         // +required
  7.         Spec PropagationSpec `json:"spec"`
  8. }
复制代码
PropagationSpec 结构
  1. // PropagationSpec represents the desired behavior of PropagationPolicy.
  2. type PropagationSpec struct {
  3.         // ResourceSelectors used to select resources.
  4.         // Nil or empty selector is not allowed and doesn't mean match all kinds
  5.         // of resources for security concerns that sensitive resources(like Secret)
  6.         // might be accidentally propagated.
  7.         // +required
  8.         // +kubebuilder:validation:MinItems=1
  9.         ResourceSelectors []ResourceSelector `json:"resourceSelectors"`
  10.         // Association tells if relevant resources should be selected automatically.
  11.         // e.g. a ConfigMap referred by a Deployment.
  12.         // default false.
  13.         // Deprecated: in favor of PropagateDeps.
  14.         // +optional
  15.         Association bool `json:"association,omitempty"`
  16.         // PropagateDeps tells if relevant resources should be propagated automatically.
  17.         // Take 'Deployment' which referencing 'ConfigMap' and 'Secret' as an example, when 'propagateDeps' is 'true',
  18.         // the referencing resources could be omitted(for saving config effort) from 'resourceSelectors' as they will be
  19.         // propagated along with the Deployment. In addition to the propagating process, the referencing resources will be
  20.         // migrated along with the Deployment in the fail-over scenario.
  21.         //
  22.         // Defaults to false.
  23.         // +optional
  24.         PropagateDeps bool `json:"propagateDeps,omitempty"`
  25.         // Placement represents the rule for select clusters to propagate resources.
  26.         // +optional
  27.         Placement Placement `json:"placement,omitempty"`
复制代码
关键字段说明

  • ResourceSelectors: 资源选择器,用于选择哪些资源需要传播

    • 可以通过 APIVersion、Kind、Name、Namespace、LabelSelector 来选择资源
    • 至少需要一个选择器(安全考虑,防止敏感资源被意外传播)

  • PropagateDeps: 是否自动传播依赖资源

    • 例如:Deployment 引用的 ConfigMap 和 Secret
    • 设置为 true 时,可以省略这些依赖资源的选择器,它们会自动传播

  • Placement: 集群选择规则(详见下文)
  • Priority: 策略优先级(用于策略抢占)
  • Failover: 故障转移行为(详见下文)
ResourceSelector 结构
  1. // ResourceSelector the resources will be selected.
  2. type ResourceSelector struct {
  3.         // APIVersion represents the API version of the target resources.
  4.         // +required
  5.         APIVersion string `json:"apiVersion"`
  6.         // Kind represents the Kind of the target resources.
  7.         // +required
  8.         Kind string `json:"kind"`
  9.         // Namespace of the target resource.
  10.         // Default is empty, which means inherit from the parent object scope.
  11.         // +optional
  12.         Namespace string `json:"namespace,omitempty"`
  13.         // Name of the target resource.
  14.         // Default is empty, which means selecting all resources.
  15.         // +optional
  16.         Name string `json:"name,omitempty"`
  17.         // A label query over a set of resources.
  18.         // If name is not empty, labelSelector will be ignored.
  19.         // +optional
  20.         LabelSelector *metav1.LabelSelector `json:"labelSelector,omitempty"`
  21. }
复制代码
ClusterPropagationPolicy

ClusterPropagationPolicy 与 PropagationPolicy 功能相同,但作用域是集群级别的(用于集群级别的资源,如 ClusterRole、ClusterRoleBinding 等)。
0.2 Placement(放置规则)

Placement 定义了资源应该被调度到哪些集群,以及如何在这些集群间分配。
定义

Placement 包含以下主要字段:

  • ClusterAffinity: 集群亲和性(首选哪些集群)
  • ClusterTolerations: 集群容忍度(可以调度到哪些集群)
  • SpreadConstraints: 分散约束(如何在不同集群间分配)
  • ReplicaScheduling: 副本调度策略(副本如何分配到集群)
示例

  • Duplicated: 所有集群都运行相同的副本数(如 ConfigMap、Secret)
  • Divided: 副本按比例分配到不同集群(如 Deployment)
0.3 ResourceBinding(资源绑定)

ResourceBinding(RB) 是 PropagationPolicy 和资源对象的绑定结果。它记录了:

  • 资源引用: 指向原始资源对象(Deployment、StatefulSet 等)
  • 副本信息: 从资源中提取的副本数和资源需求
  • 调度结果: 由 Scheduler 填充的目标集群列表
  • 策略信息: 从 PropagationPolicy 复制的策略配置
ResourceBinding 与 PropagationPolicy 的关系
  1. 用户创建 Deployment + PropagationPolicy
  2.     ↓
  3. ResourceDetector 检测匹配
  4.     ↓
  5. 创建 ResourceBinding(绑定 Deployment 和 PropagationPolicy)
  6.     ↓
  7. Scheduler 填充 ResourceBinding.Spec.Clusters(调度结果)
复制代码
重要:RB 是策略和资源的绑定,一个资源只会被一个策略绑定(按优先级选择)。
0.4 Work(工作负载)

Work 是实际被发送到成员集群的资源对象。一个 RB 可以生成多个 Work(每个目标集群一个)。
定义
  1. // Work defines a list of resources to be deployed on the member cluster.
  2. type Work struct {
  3.         metav1.TypeMeta   `json:",inline"`
  4.         metav1.ObjectMeta `json:"metadata,omitempty"`
  5.         // Spec represents the desired behavior of Work.
  6.         Spec WorkSpec `json:"spec"`
  7.         // Status represents the status of PropagationStatus.
  8.         // +optional
  9.         Status WorkStatus `json:"status,omitempty"`
  10. }
  11. // WorkSpec defines the desired state of Work.
  12. type WorkSpec struct {
  13.         // Workload represents the manifest workload to be deployed on managed cluster.
  14.         Workload WorkloadTemplate `json:"workload,omitempty"`
  15.         // SuspendDispatching controls whether dispatching should
  16.         // be suspended, nil means not suspend.
  17.         // Note: true means stop propagating to the corresponding member cluster, and
  18.         // does not prevent status collection.
  19.         // +optional
  20.         SuspendDispatching *bool `json:"suspendDispatching,omitempty"`
  21.         // PreserveResourcesOnDeletion controls whether resources should be preserved on the
  22.         // member cluster when the Work object is deleted.
  23.         // If set to true, resources will be preserved on the member cluster.
  24.         // Default is false, which means resources will be deleted along with the Work object.
  25.         // +optional
  26.         PreserveResourcesOnDeletion *bool `json:"preserveResourcesOnDeletion,omitempty"`
  27. }
复制代码
Work 与 ResourceBinding 的关系
  1. ResourceBinding (控制平面)
  2.     ↓ 转换为
  3. Work (执行空间: execution-{cluster-name})
  4.     ↓ 分发到
  5. 成员集群
  6.     ↓ 执行
  7. 实际资源(Deployment、Service 等)
复制代码
执行空间(Execution Space):每个成员集群都有一个对应的 namespace,格式为 execution-{cluster-name},所有发送到该集群的 Work 都放在这个 namespace 中。
0.5 ResourceInterpreter(资源解释器)

ResourceInterpreter 是 Karmada 中用于解释自定义资源的核心组件。它能够理解不同种类的资源(Deployment、StatefulSet、自定义 CRD 等),并提取关键信息。
作用

ResourceInterpreter 提供以下能力:

  • GetReplicas: 从资源中提取副本数和资源需求(用于 RB 创建时的性能瓶颈点)
  • ReviseReplica: 修改资源的副本数(用于将调度结果应用到 Work)
  • Retain: 保留集群特定的字段(防止覆盖)
  • AggregateStatus: 聚合多个集群的状态
  • InterpretHealth: 判断资源是否健康
  • GetDependencies: 获取依赖资源
实现层次
  1. // ResourceInterpreter manages both default and customized webhooks to interpret custom resource structure.
  2. type ResourceInterpreter interface {
  3.         // Start initializes the resource interpreter and performs cache synchronization.
  4.         Start(ctx context.Context) (err error)
  5.         // HookEnabled tells if any hook exist for specific resource type and operation.
  6.         HookEnabled(objGVK schema.GroupVersionKind, operationType configv1alpha1.InterpreterOperation) bool
  7.         // GetReplicas returns the desired replicas of the object as well as the requirements of each replica.
  8.         GetReplicas(object *unstructured.Unstructured) (replica int32, replicaRequires *workv1alpha2.ReplicaRequirements, err error)
  9.         // ReviseReplica revises the replica of the given object.
  10.         ReviseReplica(object *unstructured.Unstructured, replica int64) (*unstructured.Unstructured, error)
  11.         // GetComponents extracts the resource requirements for multiple components from the given object.
  12.         // This hook is designed for CRDs with multiple components (e.g., FlinkDeployment), but can
  13.         // also be used for single-component resources like Deployment.
  14.         // If implemented, the controller will use this hook to obtain per-component replica and resource
  15.         // requirements, and will not call GetReplicas.
  16.         // If not implemented, the controller will fall back to GetReplicas for backward compatibility.
  17.         // This hook will only be called when the feature gate 'MultiplePodTemplatesScheduling' is enabled.
  18.         GetComponents(object *unstructured.Unstructured) (components []workv1alpha2.Component, err error)
  19.         // Retain returns the objects that based on the "desired" object but with values retained from the "observed" object.
  20.         Retain(desired *unstructured.Unstructured, observed *unstructured.Unstructured) (retained *unstructured.Unstructured, err error)
  21.         // AggregateStatus returns the objects that based on the 'object' but with status aggregated.
  22.         AggregateStatus(object *unstructured.Unstructured, aggregatedStatusItems []workv1alpha2.AggregatedStatusItem) (*unstructured.Unstructured, error)
  23.         // GetDependencies returns the dependent resources of the given object.
复制代码
解释器类型

ResourceInterpreter 有四种实现方式(按优先级):

  • ConfigurableInterpreter(最高优先级):使用 Lua 脚本进行声明式配置
  • CustomizedInterpreter:使用 Webhook 进行自定义
  • ThirdPartyInterpreter:第三方内置的解释规则
  • DefaultInterpreter(最低优先级):Karmada 内置的默认解释器
调用顺序:
  1. // GetReplicas returns the desired replicas of the object as well as the requirements of each replica.
  2. func (i *customResourceInterpreterImpl) GetReplicas(object *unstructured.Unstructured) (replica int32, requires *workv1alpha2.ReplicaRequirements, err error) {
  3.         var hookEnabled bool
  4.         replica, requires, hookEnabled, err = i.configurableInterpreter.GetReplicas(object)
  5.         if err != nil {
  6.                 return
  7.         }
  8.         if hookEnabled {
  9.                 return
  10.         }
  11.         replica, requires, hookEnabled, err = i.customizedInterpreter.GetReplicas(context.TODO(), &request.Attributes{
  12.                 Operation: configv1alpha1.InterpreterOperationInterpretReplica,
  13.                 Object:    object,
  14.         })
  15.         if err != nil {
  16.                 return
  17.         }
  18.         if hookEnabled {
  19.                 return
  20.         }
  21.         replica, requires, hookEnabled, err = i.thirdpartyInterpreter.GetReplicas(object)
  22.         if err != nil {
  23.                 return
  24.         }
  25.         if hookEnabled {
  26.                 return
  27.         }
  28.         replica, requires, err = i.defaultInterpreter.GetReplicas(object)
  29.         return
  30. }
复制代码
0.6 ResourceDetector(资源检测器)

ResourceDetector 是 Karmada 控制器的一部分,负责:

  • 监听资源变化:监听 Kubernetes 资源(Deployment、Service 等)的创建/更新/删除
  • 匹配策略:为资源找到匹配的 PropagationPolicy
  • 创建 RB:根据策略创建或更新 ResourceBinding
  • 策略管理:处理 PropagationPolicy 的生命周期
0.7 Scheduler(调度器)

Scheduler 是 Karmada 的调度组件,负责:

  • 监听 RB:监听 ResourceBinding 的创建和更新
  • 选择集群:根据 Placement 规则选择合适的成员集群
  • 分配副本:将副本按策略分配到不同集群
  • 更新 RB:将调度结果填充到 ResourceBinding.Spec.Clusters
重要:Scheduler 只负责填充 Spec.Clusters,不会修改其他字段。
0.8 概念关系图
  1. ┌─────────────────────────────────────────────────────────┐
  2. │                   控制平面(Karmada)                      │
  3. │                                                          │
  4. │  ┌─────────────┐      ┌──────────────┐                │
  5. │  │ Deployment  │      │Propagation   │                │
  6. │  │  (资源对象)  │      │  Policy      │                │
  7. │  └──────┬──────┘      └──────┬───────┘                │
  8. │         │                    │                         │
  9. │         └────────┬───────────┘                         │
  10. │                  │                                     │
  11. │                  ▼                                     │
  12. │         ┌──────────────────┐                          │
  13. │         │ResourceDetector  │                          │
  14. │         │  (检测并创建)     │                          │
  15. │         └────────┬─────────┘                          │
  16. │                  │                                     │
  17. │                  ▼                                     │
  18. │         ┌──────────────────┐                          │
  19. │         │ResourceBinding   │                          │
  20. │         │  (绑定策略和资源)  │                          │
  21. │         └────────┬─────────┘                          │
  22. │                  │                                     │
  23. │                  ▼                                     │
  24. │         ┌──────────────────┐                          │
  25. │         │   Scheduler      │                          │
  26. │         │  (选择目标集群)   │                          │
  27. │         └────────┬─────────┘                          │
  28. │                  │                                     │
  29. │                  ▼                                     │
  30. │         ┌──────────────────┐                          │
  31. │         │ResourceBinding   │                          │
  32. │         │ (Spec.Clusters已 │                          │
  33. │         │   被填充)        │                          │
  34. │         └────────┬─────────┘                          │
  35. │                  │                                     │
  36. │                  ▼                                     │
  37. │         ┌──────────────────┐                          │
  38. │         │ BindingController│                          │
  39. │         │   (转换为Work)    │                          │
  40. │         └────────┬─────────┘                          │
  41. │                  │                                     │
  42. └──────────────────┼─────────────────────────────────────┘
  43.                    │
  44.         ┌──────────┴──────────┐
  45.         │                     │
  46.         ▼                     ▼
  47. ┌──────────────┐      ┌──────────────┐
  48. │   Work       │      │   Work       │
  49. │(execution-   │      │(execution-   │
  50. │ cluster-1)   │      │ cluster-2)   │
  51. └──────┬───────┘      └──────┬───────┘
  52.        │                     │
  53.        └──────────┬──────────┘
  54.                   │
  55.         ┌─────────┴─────────┐
  56.         │                   │
  57.         ▼                   ▼
  58. ┌─────────────┐      ┌─────────────┐
  59. │成员集群1     │      │成员集群2     │
  60. │             │      │             │
  61. │ Deployment  │      │ Deployment  │
  62. │  (实际运行)  │      │  (实际运行)  │
  63. └─────────────┘      └─────────────┘
复制代码
一、RB 的生命周期

1.1 整体流程
  1. 用户创建资源 + PropagationPolicy
  2.     ↓
  3. ResourceDetector 检测并创建/更新 RB
  4.     ↓
  5. Scheduler 为 RB 选择目标集群
  6.     ↓
  7. ResourceBindingController 将 RB 转换为 Work
  8.     ↓
  9. Work 被分发到成员集群
复制代码
1.2 关键组件


  • ResourceDetector (pkg/detector/detector.go): 负责检测资源并创建 RB
  • Scheduler (pkg/scheduler/): 为 RB 选择目标集群
  • ResourceBindingController (pkg/controllers/binding/): 将 RB 转换为 Work
  • ResourceInterpreter (pkg/resourceinterpreter/): 解释资源,提取副本数和资源需求
二、RB 的创建过程

2.1 触发条件

当用户创建或更新了:

  • 资源对象(如 Deployment)
  • PropagationPolicyClusterPropagationPolicy
ResourceDetector 会检测到变化并触发 RB 的创建或更新。
2.2 创建流程详解

步骤1: 检测资源和策略匹配

当资源对象发生变化时,ResourceDetector 会:
  1. // BuildResourceBinding builds a desired ResourceBinding for object.
  2. func (d *ResourceDetector) BuildResourceBinding(object *unstructured.Unstructured, policySpec *policyv1alpha1.PropagationSpec, policyID string, policyMeta metav1.ObjectMeta, claimFunc func(object metav1.Object, policyId string, objectMeta metav1.ObjectMeta)) (*workv1alpha2.ResourceBinding, error) {
  3.         bindingName := names.GenerateBindingName(object.GetKind(), object.GetName())
  4.         propagationBinding := &workv1alpha2.ResourceBinding{
  5.                 ObjectMeta: metav1.ObjectMeta{
  6.                         Name:      bindingName,
  7.                         Namespace: object.GetNamespace(),
  8.                         OwnerReferences: []metav1.OwnerReference{
  9.                                 *metav1.NewControllerRef(object, object.GroupVersionKind()),
  10.                         },
  11.                         Finalizers: []string{util.BindingControllerFinalizer},
  12.                 },
  13.                 Spec: workv1alpha2.ResourceBindingSpec{
  14.                         PropagateDeps:               policySpec.PropagateDeps,
  15.                         SchedulerName:               policySpec.SchedulerName,
  16.                         Placement:                   &policySpec.Placement,
  17.                         Failover:                    policySpec.Failover,
  18.                         ConflictResolution:          policySpec.ConflictResolution,
  19.                         PreserveResourcesOnDeletion: policySpec.PreserveResourcesOnDeletion,
  20.                         Resource: workv1alpha2.ObjectReference{
  21.                                 APIVersion:      object.GetAPIVersion(),
  22.                                 Kind:            object.GetKind(),
  23.                                 Namespace:       object.GetNamespace(),
  24.                                 Name:            object.GetName(),
  25.                                 UID:             object.GetUID(),
  26.                                 ResourceVersion: object.GetResourceVersion(),
  27.                         },
  28.                 },
  29.         }
  30.         if policySpec.Suspension != nil {
  31.                 propagationBinding.Spec.Suspension = &workv1alpha2.Suspension{Suspension: *policySpec.Suspension}
  32.         }
  33.         claimFunc(propagationBinding, policyID, policyMeta)
  34.         if err := d.applyReplicaInterpretation(object, &propagationBinding.Spec); err != nil {
  35.                 return nil, err
  36.         }
  37.         if features.FeatureGate.Enabled(features.PriorityBasedScheduling) && policySpec.SchedulePriority != nil {
  38.                 // ... 处理调度优先级
  39.         }
  40.         return propagationBinding, nil
  41. }
复制代码
关键点

  • 使用 names.GenerateBindingName() 生成 RB 名称(格式:{Kind}-{Name})
  • 设置 OwnerReference 建立资源对象和 RB 的关联
  • 添加 Finalizer 确保删除时正确清理
步骤2: 应用副本解释

这是 RB 创建中的关键步骤
  1. // applyReplicaInterpretation handles the logic for interpreting replicas or components from an object.
  2. func (d *ResourceDetector) applyReplicaInterpretation(object *unstructured.Unstructured, spec *workv1alpha2.ResourceBindingSpec) error {
  3.         gvk := object.GroupVersionKind()
  4.         name := object.GetName()
  5.         // Prioritize component interpretation if the feature and GetComponents are enabled.
  6.         if features.FeatureGate.Enabled(features.MultiplePodTemplatesScheduling) && d.ResourceInterpreter.HookEnabled(gvk, configv1alpha1.InterpreterOperationInterpretComponent) {
  7.                 components, err := d.ResourceInterpreter.GetComponents(object)
  8.                 if err != nil {
  9.                         klog.Errorf("Failed to get components for %s(%s): %v", gvk, name, err)
  10.                         return err
  11.                 }
  12.                 spec.Components = components
  13.                 return nil
  14.         }
  15.         // GetReplicas is executed if the MultiplePodTemplatesScheduling feature gate is disabled, or if GetComponents is not implemented.
  16.         if d.ResourceInterpreter.HookEnabled(gvk, configv1alpha1.InterpreterOperationInterpretReplica) {
  17.                 replicas, replicaRequirements, err := d.ResourceInterpreter.GetReplicas(object)
  18.                 if err != nil {
  19.                         klog.Errorf("Failed to customize replicas for %s(%s): %v", gvk, name, err)
  20.                         return err
  21.                 }
  22.                 spec.Replicas = replicas
  23.                 spec.ReplicaRequirements = replicaRequirements
  24.         }
  25.         return nil
  26. }
复制代码
调用链
  1. applyReplicaInterpretation
  2.     ↓
  3. ResourceInterpreter.GetReplicas(object)
  4.     ↓
  5. ConfigurableInterpreter.GetReplicas(object)
  6.     ↓
  7. LuaVM.GetReplicas(object, script)
  8.     ↓
  9. LuaVM.RunScript(script, "GetReplicas", 2, object)  ← 需要从 VM 池获取实例
  10.     ↓
  11. VM.Pool.Get()  ← 锁竞争点
  12.     ↓
  13. Lua.DoString(script)  ← 脚本编译点
  14.     ↓
  15. Lua.CallByParam(...)  ← 执行 GetReplicas 函数
复制代码
步骤3: 创建或更新 RB

使用 CreateOrUpdate 确保 RB 存在:
  1.         binding, err := d.BuildResourceBinding(object, &policy.Spec, policyID, policy.ObjectMeta, AddPPClaimMetadata)
  2.         if err != nil {
  3.                 klog.Errorf("Failed to build resourceBinding for object: %s. error: %v", objectKey, err)
  4.                 return err
  5.         }
  6.         bindingCopy := binding.DeepCopy()
  7.         err = retry.RetryOnConflict(retry.DefaultRetry, func() (err error) {
  8.                 operationResult, err = controllerutil.CreateOrUpdate(context.TODO(), d.Client, bindingCopy, func() error {
  9.                         // If this binding exists and its owner is not the input object, return error and let garbage collector
  10.                         // delete this binding and try again later. See https://github.com/karmada-io/karmada/issues/2090.
  11.                         if ownerRef := metav1.GetControllerOfNoCopy(bindingCopy); ownerRef != nil && ownerRef.UID != object.GetUID() {
  12.                                 return fmt.Errorf("failed to update binding due to different owner reference UID, will " +
  13.                                         "try again later after binding is garbage collected, see https://github.com/karmada-io/karmada/issues/2090")
  14.                         }
  15.                         // Just update necessary fields, especially avoid modifying Spec.Clusters which is scheduling result, if already exists.
  16.                         bindingCopy.Annotations = util.DedupeAndMergeAnnotations(bindingCopy.Annotations, binding.Annotations)
  17.                         bindingCopy.Labels = util.DedupeAndMergeLabels(bindingCopy.Labels, binding.Labels)
  18.                         bindingCopy.OwnerReferences = binding.OwnerReferences
  19.                         bindingCopy.Spec.Placement = binding.Spec.Placement
  20.                         bindingCopy.Spec.Resource = binding.Spec.Resource
  21.                         bindingCopy.Spec.ConflictResolution = binding.Spec.ConflictResolution
  22.                         if binding.Spec.Suspension != nil {
  23.                                 if bindingCopy.Spec.Suspension == nil {
  24.                                         bindingCopy.Spec.Suspension = &workv1alpha2.Suspension{}
  25.                                 }
  26.                                 bindingCopy.Spec.Suspension.Suspension = binding.Spec.Suspension.Suspension
  27.                         }
  28.                         return nil
  29.                 })
  30.                 if err != nil {
  31.                         return err
  32.                 }
  33.                 return nil
  34.         })
复制代码
关键点

  • 使用 RetryOnConflict 处理并发更新冲突
  • 不修改 Spec.Clusters,这是调度器的调度结果
  • 只更新策略相关的字段
三、RB 到 Work 的转换

3.1 转换触发

当 RB 被创建或更新后,ResourceBindingController 会监听 RB 的变化:
  1. // syncBinding will sync resourceBinding to Works.
  2. func (c *ResourceBindingController) syncBinding(ctx context.Context, binding *workv1alpha2.ResourceBinding) (controllerruntime.Result, error) {
  3.         if err := c.removeOrphanWorks(ctx, binding); err != nil {
  4.                 return controllerruntime.Result{}, err
  5.         }
  6.         needWaitForCleanup, err := c.checkDirectPurgeOrphanWorks(ctx, binding)
  7.         if err != nil {
  8.                 return controllerruntime.Result{}, err
  9.         }
  10.         if needWaitForCleanup {
  11.                 msg := fmt.Sprintf("There are works in clusters with PurgeMode 'Directly' not deleted for ResourceBinding(%s/%s), skip syncing works",
  12.                         binding.Namespace, binding.Name)
  13.                 klog.V(4).InfoS(msg, "namespace", binding.GetNamespace(), "binding", binding.GetName())
  14.                 return controllerruntime.Result{RequeueAfter: requeueIntervalForDirectlyPurge}, nil
  15.         }
  16.         workload, err := helper.FetchResourceTemplate(ctx, c.DynamicClient, c.InformerManager, c.RESTMapper, binding.Spec.Resource)
  17.         if err != nil {
  18.                 if apierrors.IsNotFound(err) {
  19.                         // It might happen when the resource template has been removed but the garbage collector hasn't removed
  20.                         // the ResourceBinding which dependent on resource template.
  21.                         // So, just return without retry(requeue) would save unnecessary loop.
  22.                         return controllerruntime.Result{}, nil
  23.                 }
  24.                 klog.ErrorS(err, "Failed to fetch workload for ResourceBinding", "namespace", binding.GetNamespace(), "binding", binding.GetName())
  25.                 return controllerruntime.Result{}, err
  26.         }
  27.         start := time.Now()
  28.         err = ensureWork(ctx, c.Client, c.ResourceInterpreter, workload, c.OverrideManager, binding, apiextensionsv1.NamespaceScoped)
  29.         metrics.ObserveSyncWorkLatency(err, start)
  30.         if err != nil {
  31.                 klog.ErrorS(err, "Failed to transform ResourceBinding to works", "namespace", binding.GetNamespace(), "binding", binding.GetName())
  32.                 c.EventRecorder.Event(binding, corev1.EventTypeWarning, events.EventReasonSyncWorkFailed, err.Error())
  33.                 c.EventRecorder.Event(workload, corev1.EventTypeWarning, events.EventReasonSyncWorkFailed, err.Error())
  34.                 return controllerruntime.Result{}, err
  35.         }
  36.         msg := fmt.Sprintf("Sync work of ResourceBinding(%s/%s) successful.",
复制代码
3.2 ensureWork 函数详解

这是将 RB 转换为 Work 的核心函数:
  1. // ensureWork ensure Work to be created or updated.
  2. func ensureWork(
  3.         ctx context.Context, c client.Client, resourceInterpreter resourceinterpreter.ResourceInterpreter, workload *unstructured.Unstructured,
  4.         overrideManager overridemanager.OverrideManager, binding metav1.Object, scope apiextensionsv1.ResourceScope,
  5. ) error {
  6.         bindingSpec := getBindingSpec(binding, scope)
  7.         targetClusters := mergeTargetClusters(bindingSpec.Clusters, bindingSpec.RequiredBy)
  8.         var err error
  9.         var errs []error
  10.         var jobCompletions []workv1alpha2.TargetCluster
  11.         if workload.GetKind() == util.JobKind && needReviseJobCompletions(bindingSpec.Replicas, bindingSpec.Placement) {
  12.                 jobCompletions, err = divideReplicasByJobCompletions(workload, targetClusters)
  13.                 if err != nil {
  14.                         return err
  15.                 }
  16.         }
  17.         for i := range targetClusters {
  18.                 targetCluster := targetClusters[i]
  19.                 clonedWorkload := workload.DeepCopy()
  20.                 workNamespace := names.GenerateExecutionSpaceName(targetCluster.Name)
  21.                 // When syncing workloads to member clusters, the controller MUST strictly adhere to the scheduling results
  22.                 // specified in bindingSpec.Clusters for replica allocation, rather than using the replicas declared in the
  23.                 // workload's resource template.
  24.                 // This rule applies regardless of whether the workload distribution mode is "Divided" or "Duplicated".
  25.                 // Failing to do so could allow workloads to bypass the quota checks performed by the scheduler
  26.                 // (especially during scale-up operations) or skip queue validation when scheduling is suspended.
  27.                 if bindingSpec.IsWorkload() {
  28.                         if resourceInterpreter.HookEnabled(clonedWorkload.GroupVersionKind(), configv1alpha1.InterpreterOperationReviseReplica) {
  29.                                 clonedWorkload, err = resourceInterpreter.ReviseReplica(clonedWorkload, int64(targetCluster.Replicas))
  30.                                 if err != nil {
  31.                                         klog.ErrorS(err, "Failed to revise replica for workload in cluster.", "workloadKind", workload.GetKind(),
  32.                                                 "workloadNamespace", workload.GetNamespace(), "workloadName", workload.GetName(), "cluster", targetCluster.Name)
  33.                                         errs = append(errs, err)
  34.                                         continue
  35.                                 }
  36.                         }
  37.                 }
  38.                 // jobSpec.Completions specifies the desired number of successfully finished pods the job should be run with.
  39.                 // When the replica scheduling policy is set to "divided", jobSpec.Completions should also be divided accordingly.
  40.                 // The weight assigned to each cluster roughly equals that cluster's jobSpec.Parallelism value. This approach helps
  41.                 // balance the execution time of the job across member clusters.
  42.                 if len(jobCompletions) > 0 {
  43.                         // Set allocated completions for Job only when the '.spec.completions' field not omitted from resource template.
  44.                         // For jobs running with a 'work queue' usually leaves '.spec.completions' unset, in that case we skip
  45.                         // setting this field as well.
  46.                         // Refer to: https://kubernetes.io/docs/concepts/workloads/controllers/job/#parallel-jobs.
  47.                         if err = helper.ApplyReplica(clonedWorkload, int64(jobCompletions[i].Replicas), util.CompletionsField); err != nil {
  48.                                 klog.ErrorS(err, "Failed to apply Completions for workload in cluster.",
  49.                                         "workloadKind", clonedWorkload.GetKind(), "workloadNamespace", clonedWorkload.GetNamespace(),
  50.                                         "workloadName", clonedWorkload.GetName(), "cluster", targetCluster.Name)
  51.                                 errs = append(errs, err)
  52.                                 continue
  53.                         }
  54.                 }
  55.                 // We should call ApplyOverridePolicies last, as override rules have the highest priority
  56.                 cops, ops, err := overrideManager.ApplyOverridePolicies(clonedWorkload, targetCluster.Name)
  57.                 if err != nil {
  58.                         klog.ErrorS(err, "Failed to apply overrides for workload in cluster.",
  59.                                 "workloadKind", clonedWorkload.GetKind(), "workloadNamespace", clonedWorkload.GetNamespace(),
  60.                                 "workloadName", clonedWorkload.GetName(), "cluster", targetCluster.Name)
  61.                         errs = append(errs, err)
  62.                         continue
  63.                 }
  64.                 workLabel := mergeLabel(clonedWorkload, binding, scope)
  65.                 annotations := mergeAnnotations(clonedWorkload, binding, scope)
  66.                 annotations = mergeConflictResolution(clonedWorkload, bindingSpec.ConflictResolution, annotations)
  67.                 annotations, err = RecordAppliedOverrides(cops, ops, annotations)
  68.                 if err != nil {
  69.                         klog.ErrorS(err, "Failed to record appliedOverrides in cluster.", "cluster", targetCluster.Name)
  70.                         errs = append(errs, err)
  71.                         continue
  72.                 }
  73.                 if features.FeatureGate.Enabled(features.StatefulFailoverInjection) {
  74.                         // we need to figure out if the targetCluster is in the cluster we are going to migrate application to.
  75.                         // If yes, we have to inject the preserved label state to the clonedWorkload.
  76.                         clonedWorkload = injectReservedLabelState(bindingSpec, targetCluster, clonedWorkload, len(targetClusters))
  77.                 }
  78.                 workMeta := metav1.ObjectMeta{
  79.                         Name:        names.GenerateWorkName(clonedWorkload.GetKind(), clonedWorkload.GetName(), clonedWorkload.GetNamespace()),
  80.                         Namespace:   workNamespace,
  81.                         Finalizers:  []string{util.ExecutionControllerFinalizer},
  82.                         Labels:      workLabel,
  83.                         Annotations: annotations,
  84.                 }
  85.                 if err = ctrlutil.CreateOrUpdateWork(
  86.                         ctx,
  87.                         c,
  88.                         workMeta,
  89.                         clonedWorkload,
  90.                         ctrlutil.WithSuspendDispatching(shouldSuspendDispatching(bindingSpec.Suspension, targetCluster)),
  91.                         ctrlutil.WithPreserveResourcesOnDeletion(ptr.Deref(bindingSpec.PreserveResourcesOnDeletion, false)),
  92.                 ); err != nil {
  93.                         errs = append(errs, err)
  94.                         continue
  95.                 }
  96.         }
  97.         return errors.NewAggregate(errs)
  98. }
复制代码
关键步骤

  • 获取目标集群列表:从 bindingSpec.Clusters 中获取调度结果
  • 为每个集群创建 Work

    • 克隆 workload
    • 根据调度结果调整副本数(ReviseReplica)
    • 应用 OverridePolicies
    • 创建 Work 对象

四、RB 的数据结构

4.1 ResourceBindingSpec
  1. type ResourceBindingSpec struct {
  2.     // 资源引用
  3.     Resource workv1alpha2.ObjectReference
  4.    
  5.     // 副本信息(从 ResourceInterpreter 获取)
  6.     Replicas *int32
  7.     ReplicaRequirements *ReplicaRequirements
  8.     Components []Component
  9.    
  10.     // 调度相关
  11.     Placement *Placement
  12.     Clusters []TargetCluster  // 调度结果,由 Scheduler 填充
  13.     SchedulerName string
  14.    
  15.     // 其他
  16.     PropagateDeps bool
  17.     Failover *FailoverBehavior
  18.     ConflictResolution ConflictResolution
  19.     // ...
  20. }
复制代码
4.2 关键字段说明


  • Resource: 指向原始资源对象(Deployment、StatefulSet 等)
  • Replicas/ReplicaRequirements: 从资源中提取的副本数和资源需求
  • Clusters: 由 Scheduler 填充,包含目标集群和分配的副本数
  • Placement: 调度策略(从哪里调度)
五、关键概念快速索引

5.1 核心资源对象

概念定义作用域关键字段PropagationPolicy定义资源如何传播的策略NamespaceResourceSelectors, Placement, PriorityClusterPropagationPolicy集群级别的传播策略Cluster同 PropagationPolicyResourceBinding资源对象和策略的绑定NamespaceResource, Replicas, ClustersClusterResourceBinding集群级别的资源绑定Cluster同 ResourceBindingWork实际发送到成员集群的工作负载NamespaceWorkload.Manifests, Status5.2 核心组件

组件职责关键操作ResourceDetector检测资源并创建 RBApplyPolicy, BuildResourceBindingScheduler为 RB 选择目标集群填充 Spec.ClustersResourceBindingController将 RB 转换为 WorksyncBinding, ensureWorkResourceInterpreter解释资源结构GetReplicas, ReviseReplicaExecutionController在成员集群执行 WorksyncToClusters5.3 关键数据结构

Placement(放置规则)
  1. type Placement struct {
  2.         // ClusterAffinity represents scheduling restrictions to a certain set of clusters.
  3.         // Note:
  4.         //   1. ClusterAffinity can not co-exist with ClusterAffinities.
  5.         //   2. If both ClusterAffinity and ClusterAffinities are not set, any cluster
  6.         //      can be scheduling candidates.
  7.         // +optional
  8.         ClusterAffinity *ClusterAffinity `json:"clusterAffinity,omitempty"`
  9.         // ClusterAffinities represents scheduling restrictions to multiple cluster
  10.         // groups that indicated by ClusterAffinityTerm.
  11.         //
  12.         // The scheduler will evaluate these groups one by one in the order they
  13.         // appear in the spec, the group that does not satisfy scheduling restrictions
  14.         // will be ignored which means all clusters in this group will not be selected
  15.         // unless it also belongs to the next group(a cluster could belong to multiple
  16.         // groups).
  17.         //
  18.         // If none of the groups satisfy the scheduling restrictions, then scheduling
  19.         // fails, which means no cluster will be selected.
  20.         //
  21.         // Note:
  22.         //   1. ClusterAffinities can not co-exist with ClusterAffinity.
  23.         //   2. If both ClusterAffinity and ClusterAffinities are not set, any cluster
  24.         //      can be scheduling candidates.
  25.         //
  26.         // Potential use case 1:
  27.         // The private clusters in the local data center could be the main group, and
  28.         // the managed clusters provided by cluster providers could be the secondary
  29.         // group. So that the Karmada scheduler would prefer to schedule workloads
  30.         // to the main group and the second group will only be considered in case of
  31.         // the main group does not satisfy restrictions(like, lack of resources).
  32.         //
  33.         // Potential use case 2:
  34.         // For the disaster recovery scenario, the clusters could be organized to
  35.         // primary and backup groups, the workloads would be scheduled to primary
  36.         // clusters firstly, and when primary cluster fails(like data center power off),
  37.         // Karmada scheduler could migrate workloads to the backup clusters.
  38.         //
  39.         // +optional
  40.         ClusterAffinities []ClusterAffinityTerm `json:"clusterAffinities,omitempty"`
  41.         // ClusterTolerations represents the tolerations.
  42.         // +optional
  43.         ClusterTolerations []corev1.Toleration `json:"clusterTolerations,omitempty"`
  44.         // SpreadConstraints represents a list of the scheduling constraints.
  45.         // +optional
  46.         SpreadConstraints []SpreadConstraint `json:"spreadConstraints,omitempty"`
  47.         // ReplicaScheduling represents the scheduling policy on dealing with the number of replicas
  48.         // when propagating resources that have replicas in spec (e.g. deployments, statefulsets) to member clusters.
  49.         // +optional
  50.         ReplicaScheduling *ReplicaSchedulingStrategy `json:"replicaScheduling,omitempty"`
  51. }
复制代码
字段说明

  • ClusterAffinity: 首选哪些集群(通过标签、字段、集群名选择)
  • ClusterTolerations: 集群容忍度(类似 Pod 的 Tolerations)
  • SpreadConstraints: 分散约束(如:最多 3 个集群,每个集群至少 1 个副本)
  • ReplicaScheduling: 副本调度策略(Duplicated 或 Divided)
ResourceBindingSpec

RB 的 Spec 包含:

  • Resource: 资源引用(APIVersion, Kind, Name, Namespace, UID)
  • Replicas: 副本数(从 ResourceInterpreter 获取)
  • ReplicaRequirements: 资源需求(CPU、内存等)
  • Clusters: 目标集群列表(由 Scheduler 填充
  • Placement: 放置规则(从 PropagationPolicy 复制)
  • Failover: 故障转移行为
重要:Spec.Clusters 是调度结果,ResourceDetector 和 BindingController 都不会修改它。
5.4 执行空间(Execution Space)

每个成员集群都有一个对应的 namespace:

  • 命名规则: execution-{cluster-name}
  • 作用: 存放发送到该集群的所有 Work
  • 示例:

    • 集群 member1 → namespace execution-member1
    • 集群 member2 → namespace execution-member2

5.5 故障转移(Failover)

FailoverBehavior 定义了在应用或集群故障时的行为:
  1. // FailoverBehavior indicates failover behaviors in case of an application or
  2. // cluster failure.
  3. type FailoverBehavior struct {
  4.         // Application indicates failover behaviors in case of application failure.
  5.         // If this value is nil, failover is disabled.
  6.         // If set, the PropagateDeps should be true so that the dependencies could
  7.         // be migrated along with the application.
  8.         // +optional
  9.         Application *ApplicationFailoverBehavior `json:"application,omitempty"`
  10.         // Cluster indicates failover behaviors in case of cluster failure.
  11.         // If this value is nil, the failover behavior in case of cluster failure
  12.         // will be controlled by the controller's no-execute-taint-eviction-purge-mode
  13.         // parameter.
  14.         // If set, the failover behavior in case of cluster failure will be defined
  15.         // by this value.
  16.         // +optional
  17.         Cluster *ClusterFailoverBehavior `json:"cluster,omitempty"`
  18. }
  19. // ApplicationFailoverBehavior indicates application failover behaviors.
  20. type ApplicationFailoverBehavior struct {
  21.         // DecisionConditions indicates the decision conditions of performing the failover process.
  22.         // Only when all conditions are met can the failover process be performed.
  23.         // Currently, DecisionConditions includes several conditions:
  24.         // - TolerationSeconds (optional)
  25.         // +required
  26.         DecisionConditions DecisionConditions `json:"decisionConditions"`
  27.         // PurgeMode represents how to deal with the legacy applications on the
  28.         // cluster from which the application is migrated.
  29.         // Valid options are "Directly", "Gracefully", "Never", "Immediately"(deprecated),
  30.         // and "Graciously"(deprecated).
  31.         // Defaults to "Gracefully".
  32.         // +kubebuilder:validation:Enum=Directly;Gracefully;Never;Immediately;Graciously
  33.         // +kubebuilder:default=Gracefully
  34.         // +optional
  35.         PurgeMode PurgeMode `json:"purgeMode,omitempty"`
复制代码
PurgeMode 说明

  • Directly: 立即删除旧集群上的应用(用于不能容忍两个实例同时运行的应用,如 Flink)
  • Gracefully: 等待新集群上的应用健康后再删除(默认)
  • Never: 不删除,手动清理
六、总结

6.1 关键流程回顾


  • 资源创建 → ResourceDetector 检测
  • 策略匹配 → 找到匹配的 PropagationPolicy
  • 创建 RB → BuildResourceBinding(性能瓶颈:Lua 脚本执行)
  • 调度 → Scheduler 填充 Spec.Clusters
  • 转换 Work → ResourceBindingController 将 RB 转换为 Work
  • 分发执行 → Work 发送到成员集群执行

来源:程序园用户自行投稿发布,如果侵权,请联系站长删除
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!

相关推荐

2026-1-17 22:58:03

举报

2026-1-18 17:20:40

举报

2026-1-19 19:30:18

举报

2026-1-25 12:01:11

举报

2026-1-26 11:26:40

举报

2026-1-27 07:55:37

举报

喜欢鼓捣这些软件,现在用得少,谢谢分享!
2026-1-30 14:39:49

举报

感谢发布原创作品,程序园因你更精彩
2026-2-2 02:19:44

举报

2026-2-5 03:04:47

举报

2026-2-8 16:27:30

举报

12下一页
您需要登录后才可以回帖 登录 | 立即注册