Kubernetes CRI 分析 - kubelet 创建 Pod 分析

发布时间:2023-01-27 18:00

kubelet CRI 创建 Pod 调用流程

本文以 kubelet dockershim 创建 Pod 调用流程为例做分析。
kubelet 通过调用 dockershim 来创建并启动容器,而 dockershim 则调用 Docker 来创建并启动容器,并调用 CNI 来构建 Pod 网络。

Kubernetes CRI 分析 - kubelet 创建 Pod 分析_第1张图片
kubelet dockershim 创建 Pod 调用流程图

dockershim 属于 kubelet 内置 CRI shim,其余的 remote CRI shim 创建 Pod 调用流程其实与 dockershim 调用基本一致,只不过是调用了不同的容器引擎来操作容器,但一样由 CRI shim 调用 CNI 来构建 Pod 网络。

下面是详细的源码分析。

kubeGenericRuntimeManager 的 SyncPod 方法,调用 CRI 创建 Pod 的逻辑将在该方法里触发。

从该方法代码也可以看出,kubelet 创建一个 Pod 的逻辑为:

  1. 先创建并启动 Pod sandbox 容器,并构建好 Pod 网络。

  2. 创建并启动 ephemeral containers。

  3. 创建并启动 init containers。

  4. 最后创建并启动 normal containers(即普通业务容器)。

这里对调用 m.createPodSandbox 来创建 Pod sandbox 进行分析,用 m.startContainer 等调用分析可以参照该分析自动进行分析,调用流程几乎一致。

	// pkg/kubelet/kuberuntime/kuberuntime_manager.go
	// SyncPod syncs the running pod into the desired pod by executing following steps:
	//
	//  1. Compute sandbox and container changes.
	//  2. Kill pod sandbox if necessary.
	//  3. Kill any containers that should not be running.
	//  4. Create sandbox if necessary.
	//  5. Create ephemeral containers.
	//  6. Create init containers.
	//  7. Create normal containers.
	func (m *kubeGenericRuntimeManager) SyncPod(pod *v1.Pod, podStatus *kubecontainer.PodStatus, pullSecrets []v1.Secret, backOff *flowcontrol.Backoff) (result kubecontainer.PodSyncResult) {
	 ...
	 // Step 4: Create a sandbox for the pod if necessary.
	 podSandboxID := podContainerChanges.SandboxID
	 if podContainerChanges.CreateSandbox {
		var msg string
		var err error

		klog.V(4).Infof("Creating sandbox for pod %q", format.Pod(pod))
		createSandboxResult := kubecontainer.NewSyncResult(kubecontainer.CreatePodSandbox, format.Pod(pod))
		result.AddSyncResult(createSandboxResult)
		podSandboxID, msg, err = m.createPodSandbox(pod, podContainerChanges.Attempt)
		...
	}

m.createPodSandbox

**m.createPodSandbox **方法主要是调用 m.runtimeService.RunPodSandbox

runtimeService 即 RemoteRuntimeService,实现了 CRI shim 客户端-容器运行时接口 RuntimeService interface,持有与 CRI shim 容器运行时服务端通信的客户端。所以调用 m.runtimeService.RunPodSandbox,实际上等于调用了 CRI shim 服务端的 RunPodSandbox 方法,来进行 Pod sandbox 的创建。

	// pkg/kubelet/kuberuntime/kuberuntime_sandbox.go
	// createPodSandbox creates a pod sandbox and returns (podSandBoxID, message, error).
	func (m *kubeGenericRuntimeManager) createPodSandbox(pod *v1.Pod, attempt uint32) (string, string, error) {
	 podSandboxConfig, err := m.generatePodSandboxConfig(pod, attempt)
	 if err != nil {
		message := fmt.Sprintf("GeneratePodSandboxConfig for pod %q failed: %v", format.Pod(pod), err)
		klog.Error(message)
		return "", message, err
	 }

	 // Create pod logs directory
	 err = m.osInterface.MkdirAll(podSandboxConfig.LogDirectory, 0755)
	 if err != nil {
		message := fmt.Sprintf("Create pod log directory for pod %q failed: %v", format.Pod(pod), err)
		klog.Errorf(message)
		return "", message, err
	 }

	 runtimeHandler := ""
	 if utilfeature.DefaultFeatureGate.Enabled(features.RuntimeClass) && m.runtimeClassManager != nil {
		runtimeHandler, err = m.runtimeClassManager.LookupRuntimeHandler(pod.Spec.RuntimeClassName)
		if err != nil {
		 message := fmt.Sprintf("CreatePodSandbox for pod %q failed: %v", format.Pod(pod), err)
		 return "", message, err
		}
		if runtimeHandler != "" {
		 klog.V(2).Infof("Running pod %s with RuntimeHandler %q", format.Pod(pod), runtimeHandler)
		}
	 }

	 podSandBoxID, err := m.runtimeService.RunPodSandbox(podSandboxConfig, runtimeHandler)
	 if err != nil {
		message := fmt.Sprintf("CreatePodSandbox for pod %q failed: %v", format.Pod(pod), err)
		klog.Error(message)
		return "", message, err
	 }

	 return podSandBoxID, "", nil
	}

m.runtimeService.RunPodSandbox

m.runtimeService.RunPodSandbox 方法,会调用 r.runtimeClient.RunPodSandbox,即利用 CRI shim 客户端,调用 CRI shim 服务端来进行 Pod sandbox 的创建。

分析到这里,kubelet 中的 CRI 相关调用就分析完毕了,接下来将会进入到 CRI shim(以 kubelet 内置 CRI shim-dockershim 为例)里进行创建 Pod sandbox 的分析。

	// pkg/kubelet/remote/remote_runtime.go
	// RunPodSandbox creates and starts a pod-level sandbox. Runtimes should ensure
	// the sandbox is in ready state.
	func (r *RemoteRuntimeService) RunPodSandbox(config *runtimeapi.PodSandboxConfig, runtimeHandler string) (string, error) {
	 // Use 2 times longer timeout for sandbox operation (4 mins by default)
	 // TODO: Make the pod sandbox timeout configurable.
	 ctx, cancel := getContextWithTimeout(r.timeout * 2)
	 defer cancel()

	 resp, err := r.runtimeClient.RunPodSandbox(ctx, &runtimeapi.RunPodSandboxRequest{
		Config:         config,
		RuntimeHandler: runtimeHandler,
	 })
	 if err != nil {
		klog.Errorf("RunPodSandbox from runtime service failed: %v", err)
		return "", err
	 }

	 if resp.PodSandboxId == "" {
		errorMessage := fmt.Sprintf("PodSandboxId is not set for sandbox %q", config.GetMetadata())
		klog.Errorf("RunPodSandbox failed: %s", errorMessage)
		return "", errors.New(errorMessage)
	 }

	 return resp.PodSandboxId, nil
	}

r.runtimeClient.RunPodSandbox

接下来以 dockershim 为例,进入到 CRI shim 来进行创建 Pod sandbox 的分析。

前面 kubelet 调用 r.runtimeClient.RunPodSandbox,会进入到 dockershim 下面的 RunPodSandbox 方法。

创建 Pod sandbox 主要有 5 个步骤:

  1. 调用 docker,拉取 pod sandbox 的镜像。

  2. 调用 docker,创建 pod sandbox 容器。

  3. 创建 pod sandbox 的 Checkpoint。

  4. 调用 docker,启动 pod sandbox 容器。

  5. 调用 CNI,给 pod sandbox 构建网络。

     // pkg/kubelet/dockershim/docker_sandbox.go
     // RunPodSandbox creates and starts a pod-level sandbox. Runtimes should ensure
     // the sandbox is in ready state.
     // For docker, PodSandbox is implemented by a container holding the network
     // namespace for the pod.
     // Note: docker doesn't use LogDirectory (yet).
     func (ds *dockerService) RunPodSandbox(ctx context.Context, r *runtimeapi.RunPodSandboxRequest) (*runtimeapi.RunPodSandboxResponse, error) {
      config := r.GetConfig()
    
      // Step 1: Pull the image for the sandbox.
      image := defaultSandboxImage
      podSandboxImage := ds.podSandboxImage
      if len(podSandboxImage) != 0 {
     	image = podSandboxImage
      }
    
      // NOTE: To use a custom sandbox image in a private repository, users need to configure the nodes with credentials properly.
      // see: http://kubernetes.io/docs/user-guide/images/#configuring-nodes-to-authenticate-to-a-private-repository
      // Only pull sandbox image when it's not present - v1.PullIfNotPresent.
      if err := ensureSandboxImageExists(ds.client, image); err != nil {
     	return nil, err
      }
    
      // Step 2: Create the sandbox container.
      if r.GetRuntimeHandler() != "" && r.GetRuntimeHandler() != runtimeName {
     	return nil, fmt.Errorf("RuntimeHandler %q not supported", r.GetRuntimeHandler())
      }
      createConfig, err := ds.makeSandboxDockerConfig(config, image)
      if err != nil {
     	return nil, fmt.Errorf("failed to make sandbox docker config for pod %q: %v", config.Metadata.Name, err)
      }
      createResp, err := ds.client.CreateContainer(*createConfig)
      if err != nil {
     	createResp, err = recoverFromCreationConflictIfNeeded(ds.client, *createConfig, err)
      }
    
      if err != nil || createResp == nil {
     	return nil, fmt.Errorf("failed to create a sandbox for pod %q: %v", config.Metadata.Name, err)
      }
      resp := &runtimeapi.RunPodSandboxResponse{PodSandboxId: createResp.ID}
    
      ds.setNetworkReady(createResp.ID, false)
      defer func(e *error) {
     	// Set networking ready depending on the error return of
     	// the parent function
     	if *e == nil {
     	 ds.setNetworkReady(createResp.ID, true)
     	}
      }(&err)
    
      // Step 3: Create Sandbox Checkpoint.
      if err = ds.checkpointManager.CreateCheckpoint(createResp.ID, constructPodSandboxCheckpoint(config)); err != nil {
     	return nil, err
      }
    
      // Step 4: Start the sandbox container.
      // Assume kubelet's garbage collector would remove the sandbox later, if
      // startContainer failed.
      err = ds.client.StartContainer(createResp.ID)
      if err != nil {
     	return nil, fmt.Errorf("failed to start sandbox container for pod %q: %v", config.Metadata.Name, err)
      }
    
      // Rewrite resolv.conf file generated by docker.
      // NOTE: cluster dns settings aren't passed anymore to docker api in all cases,
      // not only for pods with host network: the resolver conf will be overwritten
      // after sandbox creation to override docker's behaviour. This resolv.conf
      // file is shared by all containers of the same pod, and needs to be modified
      // only once per pod.
      if dnsConfig := config.GetDnsConfig(); dnsConfig != nil {
     	containerInfo, err := ds.client.InspectContainer(createResp.ID)
     	if err != nil {
     	 return nil, fmt.Errorf("failed to inspect sandbox container for pod %q: %v", config.Metadata.Name, err)
     	}
    
     	if err := rewriteResolvFile(containerInfo.ResolvConfPath, dnsConfig.Servers, dnsConfig.Searches, dnsConfig.Options); err != nil {
     	 return nil, fmt.Errorf("rewrite resolv.conf failed for pod %q: %v", config.Metadata.Name, err)
     	}
      }
    
      // Do not invoke network plugins if in hostNetwork mode.
      if config.GetLinux().GetSecurityContext().GetNamespaceOptions().GetNetwork() == runtimeapi.NamespaceMode_NODE {
     	return resp, nil
      }
    
      // Step 5: Setup networking for the sandbox.
      // All pod networking is setup by a CNI plugin discovered at startup time.
      // This plugin assigns the pod ip, sets up routes inside the sandbox,
      // creates interfaces etc. In theory, its jurisdiction ends with pod
      // sandbox networking, but it might insert iptables rules or open ports
      // on the host as well, to satisfy parts of the pod spec that aren't
      // recognized by the CNI standard yet.
      cID := kubecontainer.BuildContainerID(runtimeName, createResp.ID)
      networkOptions := make(map[string]string)
      if dnsConfig := config.GetDnsConfig(); dnsConfig != nil {
     	// Build DNS options.
     	dnsOption, err := json.Marshal(dnsConfig)
     	if err != nil {
     	 return nil, fmt.Errorf("failed to marshal dns config for pod %q: %v", config.Metadata.Name, err)
     	}
     	networkOptions["dns"] = string(dnsOption)
      }
      err = ds.network.SetUpPod(config.GetMetadata().Namespace, config.GetMetadata().Name, cID, config.Annotations, networkOptions)
      if err != nil {
     	errList := []error{fmt.Errorf("failed to set up sandbox container %q network for pod %q: %v", createResp.ID, config.Metadata.Name, err)}
    
     	// Ensure network resources are cleaned up even if the plugin
     	// succeeded but an error happened between that success and here.
     	err = ds.network.TearDownPod(config.GetMetadata().Namespace, config.GetMetadata().Name, cID)
     	if err != nil {
     	 errList = append(errList, fmt.Errorf("failed to clean up sandbox container %q network for pod %q: %v", createResp.ID, config.Metadata.Name, err))
     	}
    
     	err = ds.client.StopContainer(createResp.ID, defaultSandboxGracePeriod)
     	if err != nil {
     	 errList = append(errList, fmt.Errorf("failed to stop sandbox container %q for pod %q: %v", createResp.ID, config.Metadata.Name, err))
     	}
    
     	return resp, utilerrors.NewAggregate(errList)
      }
    
      return resp, nil
     }
    

接下来以 ds.client.CreateContainer 调用为例,分析下dockershim是如何调用docker的。

ds.client.CreateContainer 主要是调用 d.client.ContainerCreate

	// pkg/kubelet/dockershim/libdocker/kube_docker_client.go
	func (d *kubeDockerClient) CreateContainer(opts dockertypes.ContainerCreateConfig) (*dockercontainer.ContainerCreateCreatedBody, error) {
	 ctx, cancel := d.getTimeoutContext()
	 defer cancel()
	 // we provide an explicit default shm size as to not depend on docker daemon.
	 // TODO: evaluate exposing this as a knob in the API
	 if opts.HostConfig != nil && opts.HostConfig.ShmSize <= 0 {
		opts.HostConfig.ShmSize = defaultShmSize
	 }
	 createResp, err := d.client.ContainerCreate(ctx, opts.Config, opts.HostConfig, opts.NetworkingConfig, opts.Name)
	 if ctxErr := contextError(ctx); ctxErr != nil {
		return nil, ctxErr
	 }
	 if err != nil {
		return nil, err
	 }
	 return &createResp, nil
	}

ds.client.ContainerCreate 构建请求参数,向 Docker 指定的 url 发送 http 请求,创建 Pod sandbox 容器。

	// vendor/github.com/docker/docker/client/container_create.go
	// ContainerCreate creates a new container based in the given configuration.
	// It can be associated with a name, but it's not mandatory.
	func (cli *Client) ContainerCreate(ctx context.Context, config *container.Config, hostConfig *container.HostConfig, networkingConfig *network.NetworkingConfig, containerName string) (container.ContainerCreateCreatedBody, error) {
	 var response container.ContainerCreateCreatedBody

	 if err := cli.NewVersionError("1.25", "stop timeout"); config != nil && config.StopTimeout != nil && err != nil {
		return response, err
	 }

	 // When using API 1.24 and under, the client is responsible for removing the container
	 if hostConfig != nil && versions.LessThan(cli.ClientVersion(), "1.25") {
		hostConfig.AutoRemove = false
	 }

	 query := url.Values{}
	 if containerName != "" {
		query.Set("name", containerName)
	 }

	 body := configWrapper{
		Config:           config,
		HostConfig:       hostConfig,
		NetworkingConfig: networkingConfig,
	 }

	 serverResp, err := cli.post(ctx, "/containers/create", query, body, nil)
	 defer ensureReaderClosed(serverResp)
	 if err != nil {
		return response, err
	 }

	 err = json.NewDecoder(serverResp.body).Decode(&response)
	 return response, err
	}
	
	// vendor/github.com/docker/docker/client/request.go
	// post sends an http request to the docker API using the method POST with a specific Go context.
	func (cli *Client) post(ctx context.Context, path string, query url.Values, obj interface{}, headers map[string][]string) (serverResponse, error) {
	 body, headers, err := encodeBody(obj, headers)
	 if err != nil {
		return serverResponse{}, err
	 }
	 return cli.sendRequest(ctx, "POST", path, query, body, headers)
	}

总结

CRI 架构图

在 CRI 之下,包括两种类型的容器运行时的实现:

  • kubelet 内置的 dockershim,实现了 Docker 容器引擎的支持以及 CNI 网络插件(包括 kubenet)的支持。dockershim 代码内置于 kubelet,被 kubelet 调用,让 dockershim 起独立的 server 来建立 CRI shim,向 kubelet 暴露 grpc server。

  • 外部的容器运行时,用来支持 rkt、containerd 等容器引擎的外部容器运行时。

ItVuer - 免责声明 - 关于我们 - 联系我们

本网站信息来源于互联网,如有侵权请联系:561261067@qq.com

桂ICP备16001015号