修复批量下载

This commit is contained in:
user123456
2025-06-13 09:33:49 +08:00
parent e1920edbd7
commit e305dd27a7

View File

@@ -247,6 +247,11 @@ func (is *ImageStreamer) streamImageLayers(ctx context.Context, img v1.Image, wr
// streamDockerFormat 生成Docker格式
func (is *ImageStreamer) streamDockerFormat(ctx context.Context, tarWriter *tar.Writer, img v1.Image, layers []v1.Layer, configFile *v1.ConfigFile, imageRef string) error {
return is.streamDockerFormatWithReturn(ctx, tarWriter, img, layers, configFile, imageRef, nil, nil)
}
// streamDockerFormatWithReturn 生成Docker格式并返回manifest和repositories信息
func (is *ImageStreamer) streamDockerFormatWithReturn(ctx context.Context, tarWriter *tar.Writer, img v1.Image, layers []v1.Layer, configFile *v1.ConfigFile, imageRef string, manifestOut *map[string]interface{}, repositoriesOut *map[string]map[string]string) error {
configDigest, err := img.ConfigName()
if err != nil {
return err
@@ -332,7 +337,8 @@ func (is *ImageStreamer) streamDockerFormat(ctx context.Context, tarWriter *tar.
}
manifest := []map[string]interface{}{{
// 构建单个镜像的manifest信息
singleManifest := map[string]interface{}{
"Config": configDigest.String() + ".json",
"RepoTags": []string{imageRef},
"Layers": func() []string {
@@ -342,7 +348,26 @@ func (is *ImageStreamer) streamDockerFormat(ctx context.Context, tarWriter *tar.
}
return layers
}(),
}}
}
// 构建repositories信息
repositories := make(map[string]map[string]string)
parts := strings.Split(imageRef, ":")
if len(parts) == 2 {
repoName := parts[0]
tag := parts[1]
repositories[repoName] = map[string]string{tag: configDigest.String()}
}
// 如果是批量下载,返回信息而不写入文件
if manifestOut != nil && repositoriesOut != nil {
*manifestOut = singleManifest
*repositoriesOut = repositories
return nil
}
// 单镜像下载直接写入manifest.json
manifest := []map[string]interface{}{singleManifest}
manifestData, err := json.Marshal(manifest)
if err != nil {
@@ -359,11 +384,98 @@ func (is *ImageStreamer) streamDockerFormat(ctx context.Context, tarWriter *tar.
return err
}
_, err = tarWriter.Write(manifestData)
if _, err := tarWriter.Write(manifestData); err != nil {
return err
}
// 写入repositories文件
repositoriesData, err := json.Marshal(repositories)
if err != nil {
return err
}
repositoriesHeader := &tar.Header{
Name: "repositories",
Size: int64(len(repositoriesData)),
Mode: 0644,
}
if err := tarWriter.WriteHeader(repositoriesHeader); err != nil {
return err
}
_, err = tarWriter.Write(repositoriesData)
return err
}
// streamSingleImageForBatch 为批量下载流式处理单个镜像
func (is *ImageStreamer) streamSingleImageForBatch(ctx context.Context, tarWriter *tar.Writer, imageRef string, options *StreamOptions) (map[string]interface{}, map[string]map[string]string, error) {
ref, err := name.ParseReference(imageRef)
if err != nil {
return nil, nil, fmt.Errorf("解析镜像引用失败: %w", err)
}
contextOptions := append(is.remoteOptions, remote.WithContext(ctx))
desc, err := is.getImageDescriptor(ref, contextOptions)
if err != nil {
return nil, nil, fmt.Errorf("获取镜像描述失败: %w", err)
}
var manifest map[string]interface{}
var repositories map[string]map[string]string
switch desc.MediaType {
case types.OCIImageIndex, types.DockerManifestList:
return nil, nil, fmt.Errorf("批量下载暂不支持多架构镜像")
case types.OCIManifestSchema1, types.DockerManifestSchema2:
img, err := desc.Image()
if err != nil {
return nil, nil, fmt.Errorf("获取镜像失败: %w", err)
}
layers, err := img.Layers()
if err != nil {
return nil, nil, fmt.Errorf("获取镜像层失败: %w", err)
}
configFile, err := img.ConfigFile()
if err != nil {
return nil, nil, fmt.Errorf("获取镜像配置失败: %w", err)
}
log.Printf("镜像包含 %d 层", len(layers))
err = is.streamDockerFormatWithReturn(ctx, tarWriter, img, layers, configFile, imageRef, &manifest, &repositories)
if err != nil {
return nil, nil, err
}
default:
img, err := desc.Image()
if err != nil {
return nil, nil, fmt.Errorf("获取镜像失败: %w", err)
}
layers, err := img.Layers()
if err != nil {
return nil, nil, fmt.Errorf("获取镜像层失败: %w", err)
}
configFile, err := img.ConfigFile()
if err != nil {
return nil, nil, fmt.Errorf("获取镜像配置失败: %w", err)
}
log.Printf("镜像包含 %d 层", len(layers))
err = is.streamDockerFormatWithReturn(ctx, tarWriter, img, layers, configFile, imageRef, &manifest, &repositories)
if err != nil {
return nil, nil, err
}
}
return manifest, repositories, nil
}
var globalImageStreamer *ImageStreamer
@@ -551,6 +663,10 @@ func (is *ImageStreamer) StreamMultipleImages(ctx context.Context, imageRefs []s
tarWriter := tar.NewWriter(finalWriter)
defer tarWriter.Close()
var allManifests []map[string]interface{}
var allRepositories = make(map[string]map[string]string)
// 流式处理每个镜像
for i, imageRef := range imageRefs {
select {
case <-ctx.Done():
@@ -560,26 +676,66 @@ func (is *ImageStreamer) StreamMultipleImages(ctx context.Context, imageRefs []s
log.Printf("处理镜像 %d/%d: %s", i+1, len(imageRefs), imageRef)
dirName := fmt.Sprintf("image_%d_%s/", i, strings.ReplaceAll(imageRef, "/", "_"))
dirHeader := &tar.Header{
Name: dirName,
Typeflag: tar.TypeDir,
Mode: 0755,
}
if err := tarWriter.WriteHeader(dirHeader); err != nil {
return fmt.Errorf("创建镜像目录失败: %w", err)
manifest, repositories, err := is.streamSingleImageForBatch(ctx, tarWriter, imageRef, options)
if err != nil {
log.Printf("下载镜像 %s 失败: %v", imageRef, err)
return fmt.Errorf("下载镜像 %s 失败: %w", imageRef, err)
}
if err := is.StreamImageToWriter(ctx, imageRef, tarWriter, &StreamOptions{
Platform: options.Platform,
Compression: false,
}); err != nil {
log.Printf("下载镜像 %s 失败: %v", imageRef, err)
continue
// 收集manifest信息
allManifests = append(allManifests, manifest)
// 合并repositories信息
for repo, tags := range repositories {
if allRepositories[repo] == nil {
allRepositories[repo] = make(map[string]string)
}
for tag, digest := range tags {
allRepositories[repo][tag] = digest
}
}
}
// 写入合并的manifest.json
manifestData, err := json.Marshal(allManifests)
if err != nil {
return fmt.Errorf("序列化manifest失败: %w", err)
}
manifestHeader := &tar.Header{
Name: "manifest.json",
Size: int64(len(manifestData)),
Mode: 0644,
}
if err := tarWriter.WriteHeader(manifestHeader); err != nil {
return fmt.Errorf("写入manifest header失败: %w", err)
}
if _, err := tarWriter.Write(manifestData); err != nil {
return fmt.Errorf("写入manifest数据失败: %w", err)
}
// 写入合并的repositories文件
repositoriesData, err := json.Marshal(allRepositories)
if err != nil {
return fmt.Errorf("序列化repositories失败: %w", err)
}
repositoriesHeader := &tar.Header{
Name: "repositories",
Size: int64(len(repositoriesData)),
Mode: 0644,
}
if err := tarWriter.WriteHeader(repositoriesHeader); err != nil {
return fmt.Errorf("写入repositories header失败: %w", err)
}
if _, err := tarWriter.Write(repositoriesData); err != nil {
return fmt.Errorf("写入repositories数据失败: %w", err)
}
log.Printf("批量下载完成,共处理 %d 个镜像", len(imageRefs))
return nil
}