kubectl源码分析之wait
  TEZNKK3IfmPf 2023年11月15日 34 0

//创建wait命令
func NewCmdWait(restClientGetter genericclioptions.RESTClientGetter, streams genericclioptions.IOStreams) *cobra.Command {
  flags := NewWaitFlags(restClientGetter, streams)//初始化wait flag

  cmd := &cobra.Command{//创建cobra命令
    Use:     "wait ([-f FILENAME] | resource.group/resource.name | resource.group [(-l label | --all)]) [--for=delete|--for condition=available]",
    Short:   "Experimental: Wait for a specific condition on one or many resources.",
    Long:    waitLong,
    Example: waitExample,

    DisableFlagsInUseLine: true,
    Run: func(cmd *cobra.Command, args []string) {
      o, err := flags.ToOptions(args)//wait flag转waitOption
      cmdutil.CheckErr(err)
      err = o.RunWait()//运行
      cmdutil.CheckErr(err)
    },
    SuggestFor: []string{"list", "ps"},
  }

  flags.AddFlags(cmd)//添加选项

  return cmd
}

// AddFlags registers flags for a cli
func (flags *WaitFlags) AddFlags(cmd *cobra.Command) {//选项
  flags.PrintFlags.AddFlags(cmd)//打印选项
  flags.ResourceBuilderFlags.AddFlags(cmd.Flags())//resource builder选项

  cmd.Flags().DurationVar(&flags.Timeout, "timeout", flags.Timeout, "The length of time to wait before giving up.  Zero means check once and don't wait, negative means wait for a week.")//timeout选项
  cmd.Flags().StringVar(&flags.ForCondition, "for", flags.ForCondition, "The condition to wait on: [delete|condition=condition-name].")//for选项
}

// ToOptions converts from CLI inputs to runtime inputs
func (flags *WaitFlags) ToOptions(args []string) (*WaitOptions, error) {//wait flag转option
  printer, err := flags.PrintFlags.ToPrinter()//print flag转printer
  if err != nil {
    return nil, err
  }
  builder := flags.ResourceBuilderFlags.ToBuilder(flags.RESTClientGetter, args)//创建builder
  clientConfig, err := flags.RESTClientGetter.ToRESTConfig()//获取restconfig
  if err != nil {
    return nil, err
  }
  dynamicClient, err := dynamic.NewForConfig(clientConfig)//获取dynamicClient
  if err != nil {
    return nil, err
  }
  conditionFn, err := conditionFuncFor(flags.ForCondition, flags.ErrOut)//获取condition函数
  if err != nil {
    return nil, err
  }

  effectiveTimeout := flags.Timeout
  if effectiveTimeout < 0 {//设置超时时间
    effectiveTimeout = 168 * time.Hour
  }

  o := &WaitOptions{//构造wait option
    ResourceFinder: builder,
    DynamicClient:  dynamicClient,
    Timeout:        effectiveTimeout,

    Printer:     printer,
    ConditionFn: conditionFn,
    IOStreams:   flags.IOStreams,
  }

  return o, nil
}
//获取condition函数
func conditionFuncFor(condition string, errOut io.Writer) (ConditionFunc, error) {
  if strings.ToLower(condition) == "delete" {//如果条件是delete,则返回IsDeleted函数
    return IsDeleted, nil
  }
  if strings.HasPrefix(condition, "condition=") {//如果有condition=前缀
    conditionName := condition[len("condition="):]//获取condition名称
    conditionValue := "true"//设置conditionvalue
    if equalsIndex := strings.Index(conditionName, "="); equalsIndex != -1 {
      conditionValue = conditionName[equalsIndex+1:]
      conditionName = conditionName[0:equalsIndex]
    }

    return ConditionalWait{//构造conditionWait,返回isConditionMet函数
      conditionName:   conditionName,
      conditionStatus: conditionValue,
      errOut:          errOut,
    }.IsConditionMet, nil
  }

  return nil, fmt.Errorf("unrecognized condition: %q", condition)
}
//运行
func (o *WaitOptions) RunWait() error {
  visitCount := 0
  err := o.ResourceFinder.Do().Visit(func(info *resource.Info, err error) error {// visit result
    if err != nil {
      return err
    }

    visitCount++
    finalObject, success, err := o.ConditionFn(info, o)//返回condition状态
    if success {//如果是success,打印对象
      o.Printer.PrintObj(finalObject, o.Out)
      return nil
    }
    if err == nil {
      return fmt.Errorf("%v unsatisified for unknown reason", finalObject)
    }
    return err
  })
  if err != nil {
    return err
  }
  if visitCount == 0 {
    return errNoMatchingResources
  }
  return err
}
//删除condition函数
func IsDeleted(info *resource.Info, o *WaitOptions) (runtime.Object, bool, error) {
  endTime := time.Now().Add(o.Timeout)//设置结束时间
  for {
    if len(info.Name) == 0 {//名称为空返回错误
      return info.Object, false, fmt.Errorf("resource name must be provided")
    }

    nameSelector := fields.OneTermEqualSelector("metadata.name", info.Name).String()//构造field-selector

    // List with a name field selector to get the current resourceVersion to watch from (not the object's resourceVersion)
    gottenObjList, err := o.DynamicClient.Resource(info.Mapping.Resource).Namespace(info.Namespace).List(metav1.ListOptions{FieldSelector: nameSelector})//删选资源
    if apierrors.IsNotFound(err) {//如果未找到资源返回true
      return info.Object, true, nil
    }
    if err != nil {//有错误返回错误
      // TODO this could do something slightly fancier if we wish
      return info.Object, false, err
    }
    if len(gottenObjList.Items) != 1 {//items不为一个,返回true
      return info.Object, true, nil
    }
    gottenObj := &gottenObjList.Items[0]
    resourceLocation := ResourceLocation{//构造resourceLocation
      GroupResource: info.Mapping.Resource.GroupResource(),
      Namespace:     gottenObj.GetNamespace(),
      Name:          gottenObj.GetName(),
    }
    if uid, ok := o.UIDMap[resourceLocation]; ok {
      if gottenObj.GetUID() != uid {//uid不等,返回true
        return gottenObj, true, nil
      }
    }

    watchOptions := metav1.ListOptions{}//构造wait结构体
    watchOptions.FieldSelector = nameSelector
    watchOptions.ResourceVersion = gottenObjList.GetResourceVersion()
    objWatch, err := o.DynamicClient.Resource(info.Mapping.Resource).Namespace(info.Namespace).Watch(watchOptions)//获取watch对象
    if err != nil {
      return gottenObj, false, err
    }

    timeout := endTime.Sub(time.Now())
    errWaitTimeoutWithName := extendErrWaitTimeout(wait.ErrWaitTimeout, info)
    if timeout < 0 {//超时,返回错误
      // we're out of time
      return gottenObj, false, errWaitTimeoutWithName
    }

    ctx, cancel := watchtools.ContextWithOptionalTimeout(context.Background(), o.Timeout)//获取ctx和cancel
    watchEvent, err := watchtools.UntilWithoutRetry(ctx, objWatch, Wait{errOut: o.ErrOut}.IsDeleted)//等待条件满足
    cancel()
    switch {
    case err == nil:// 错误为nil返回true
      return watchEvent.Object, true, nil
    case err == watchtools.ErrWatchClosed:
      continue
    case err == wait.ErrWaitTimeout:
      if watchEvent != nil {
        return watchEvent.Object, false, errWaitTimeoutWithName
      }
      return gottenObj, false, errWaitTimeoutWithName
    default:
      return gottenObj, false, err
    }
  }
}
//判断是否否删除完成
func (w Wait) IsDeleted(event watch.Event) (bool, error) {
  switch event.Type {//判断事件类型
  case watch.Error://有错误打印错误,返回false
    // keep waiting in the event we see an error - we expect the watch to be closed by
    // the server if the error is unrecoverable.
    err := apierrors.FromObject(event.Object)
    fmt.Fprintf(w.errOut, "error: An error occurred while waiting for the object to be deleted: %v", err)
    return false, nil
  case watch.Deleted://delete,返回true
    return true, nil
  default:
    return false, nil//默认返回false
  }
}
//条件函数
func (w ConditionalWait) IsConditionMet(info *resource.Info, o *WaitOptions) (runtime.Object, bool, error) {
  endTime := time.Now().Add(o.Timeout)//设置超时时间
  for {
    if len(info.Name) == 0 {//name为空返回错误
      return info.Object, false, fmt.Errorf("resource name must be provided")
    }

    nameSelector := fields.OneTermEqualSelector("metadata.name", info.Name).String()//构造字段选择器

    var gottenObj *unstructured.Unstructured
    // List with a name field selector to get the current resourceVersion to watch from (not the object's resourceVersion)
    gottenObjList, err := o.DynamicClient.Resource(info.Mapping.Resource).Namespace(info.Namespace).List(metav1.ListOptions{FieldSelector: nameSelector})//删选资源

    resourceVersion := ""
    switch {
    case err != nil://err不为空返回错误
      return info.Object, false, err
    case len(gottenObjList.Items) != 1://items不为1个
      resourceVersion = gottenObjList.GetResourceVersion()
    default:
      gottenObj = &gottenObjList.Items[0]//获取obj
      conditionMet, err := w.checkCondition(gottenObj)//判断条件是否满足
      if conditionMet {//如果满足返回true
        return gottenObj, true, nil
      }
      if err != nil {//有错误返回错误
        return gottenObj, false, err
      }
      resourceVersion = gottenObjList.GetResourceVersion()//设置resourceVersion
    }

    watchOptions := metav1.ListOptions{}//构造waitoption
    watchOptions.FieldSelector = nameSelector
    watchOptions.ResourceVersion = resourceVersion
    objWatch, err := o.DynamicClient.Resource(info.Mapping.Resource).Namespace(info.Namespace).Watch(watchOptions)//获取watch对象
    if err != nil {
      return gottenObj, false, err
    }

    timeout := endTime.Sub(time.Now())
    errWaitTimeoutWithName := extendErrWaitTimeout(wait.ErrWaitTimeout, info)
    if timeout < 0 {//超时返回错误
      // we're out of time
      return gottenObj, false, errWaitTimeoutWithName
    }

    ctx, cancel := watchtools.ContextWithOptionalTimeout(context.Background(), o.Timeout)//获取ctx和cancle
    watchEvent, err := watchtools.UntilWithoutRetry(ctx, objWatch, w.isConditionMet)//等待直到条件满足
    cancel()
    switch {
    case err == nil://错误为nil返回true
      return watchEvent.Object, true, nil
    case err == watchtools.ErrWatchClosed:
      continue
    case err == wait.ErrWaitTimeout:
      if watchEvent != nil {
        return watchEvent.Object, false, errWaitTimeoutWithName
      }
      return gottenObj, false, errWaitTimeoutWithName
    default:
      return gottenObj, false, err
    }
  }
}
//判断条件是否满足
func (w ConditionalWait) checkCondition(obj *unstructured.Unstructured) (bool, error) {
  conditions, found, err := unstructured.NestedSlice(obj.Object, "status", "conditions")//获取conditions
  if err != nil {
    return false, err
  }
  if !found {
    return false, nil
  }
  for _, conditionUncast := range conditions {//遍历conditions
    condition := conditionUncast.(map[string]interface{})//获取condition
    name, found, err := unstructured.NestedString(condition, "type")//获取condition名称
    if !found || err != nil || strings.ToLower(name) != strings.ToLower(w.conditionName) {//condition名称不是我们要找的名称跳过
      continue
    }
    status, found, err := unstructured.NestedString(condition, "status")//获取状态
    if !found || err != nil {
      continue
    }
    return strings.ToLower(status) == strings.ToLower(w.conditionStatus), nil//判断状态是否是我们指定的状态
  }

  return false, nil
}

//判断条件是否满足
func (w ConditionalWait) isConditionMet(event watch.Event) (bool, error) {
  if event.Type == watch.Error {//如果事件类型是错误,打印错误,返回false
    // keep waiting in the event we see an error - we expect the watch to be closed by
    // the server
    err := apierrors.FromObject(event.Object)
    fmt.Fprintf(w.errOut, "error: An error occurred while waiting for the condition to be satisfied: %v", err)
    return false, nil
  }
  if event.Type == watch.Deleted {//如果事件类型是delete,返回false
    // this will chain back out, result in another get and an return false back up the chain
    return false, nil
  }
  obj := event.Object.(*unstructured.Unstructured)
  return w.checkCondition(obj)//检查状态是否满足
}
【版权声明】本文内容来自摩杜云社区用户原创、第三方投稿、转载,内容版权归原作者所有。本网站的目的在于传递更多信息,不拥有版权,亦不承担相应法律责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@moduyun.com

  1. 分享:
最后一次编辑于 2023年11月15日 0

暂无评论

推荐阅读
  TEZNKK3IfmPf   2024年03月30日   24   0   0 ios
  TEZNKK3IfmPf   2024年04月19日   70   0   0 idepython
TEZNKK3IfmPf