修复批量下载
This commit is contained in:
194
src/imagetar.go
194
src/imagetar.go
@@ -247,6 +247,11 @@ func (is *ImageStreamer) streamImageLayers(ctx context.Context, img v1.Image, wr
|
|||||||
|
|
||||||
// streamDockerFormat 生成Docker格式
|
// streamDockerFormat 生成Docker格式
|
||||||
func (is *ImageStreamer) streamDockerFormat(ctx context.Context, tarWriter *tar.Writer, img v1.Image, layers []v1.Layer, configFile *v1.ConfigFile, imageRef string) error {
|
func (is *ImageStreamer) streamDockerFormat(ctx context.Context, tarWriter *tar.Writer, img v1.Image, layers []v1.Layer, configFile *v1.ConfigFile, imageRef string) error {
|
||||||
|
return is.streamDockerFormatWithReturn(ctx, tarWriter, img, layers, configFile, imageRef, nil, nil)
|
||||||
|
}
|
||||||
|
|
||||||
|
// streamDockerFormatWithReturn 生成Docker格式并返回manifest和repositories信息
|
||||||
|
func (is *ImageStreamer) streamDockerFormatWithReturn(ctx context.Context, tarWriter *tar.Writer, img v1.Image, layers []v1.Layer, configFile *v1.ConfigFile, imageRef string, manifestOut *map[string]interface{}, repositoriesOut *map[string]map[string]string) error {
|
||||||
configDigest, err := img.ConfigName()
|
configDigest, err := img.ConfigName()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
@@ -332,7 +337,8 @@ func (is *ImageStreamer) streamDockerFormat(ctx context.Context, tarWriter *tar.
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
manifest := []map[string]interface{}{{
|
// 构建单个镜像的manifest信息
|
||||||
|
singleManifest := map[string]interface{}{
|
||||||
"Config": configDigest.String() + ".json",
|
"Config": configDigest.String() + ".json",
|
||||||
"RepoTags": []string{imageRef},
|
"RepoTags": []string{imageRef},
|
||||||
"Layers": func() []string {
|
"Layers": func() []string {
|
||||||
@@ -342,7 +348,26 @@ func (is *ImageStreamer) streamDockerFormat(ctx context.Context, tarWriter *tar.
|
|||||||
}
|
}
|
||||||
return layers
|
return layers
|
||||||
}(),
|
}(),
|
||||||
}}
|
}
|
||||||
|
|
||||||
|
// 构建repositories信息
|
||||||
|
repositories := make(map[string]map[string]string)
|
||||||
|
parts := strings.Split(imageRef, ":")
|
||||||
|
if len(parts) == 2 {
|
||||||
|
repoName := parts[0]
|
||||||
|
tag := parts[1]
|
||||||
|
repositories[repoName] = map[string]string{tag: configDigest.String()}
|
||||||
|
}
|
||||||
|
|
||||||
|
// 如果是批量下载,返回信息而不写入文件
|
||||||
|
if manifestOut != nil && repositoriesOut != nil {
|
||||||
|
*manifestOut = singleManifest
|
||||||
|
*repositoriesOut = repositories
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// 单镜像下载,直接写入manifest.json
|
||||||
|
manifest := []map[string]interface{}{singleManifest}
|
||||||
|
|
||||||
manifestData, err := json.Marshal(manifest)
|
manifestData, err := json.Marshal(manifest)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@@ -359,11 +384,98 @@ func (is *ImageStreamer) streamDockerFormat(ctx context.Context, tarWriter *tar.
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
_, err = tarWriter.Write(manifestData)
|
if _, err := tarWriter.Write(manifestData); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// 写入repositories文件
|
||||||
|
repositoriesData, err := json.Marshal(repositories)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
repositoriesHeader := &tar.Header{
|
||||||
|
Name: "repositories",
|
||||||
|
Size: int64(len(repositoriesData)),
|
||||||
|
Mode: 0644,
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := tarWriter.WriteHeader(repositoriesHeader); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
_, err = tarWriter.Write(repositoriesData)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// streamSingleImageForBatch 为批量下载流式处理单个镜像
|
||||||
|
func (is *ImageStreamer) streamSingleImageForBatch(ctx context.Context, tarWriter *tar.Writer, imageRef string, options *StreamOptions) (map[string]interface{}, map[string]map[string]string, error) {
|
||||||
|
ref, err := name.ParseReference(imageRef)
|
||||||
|
if err != nil {
|
||||||
|
return nil, nil, fmt.Errorf("解析镜像引用失败: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
contextOptions := append(is.remoteOptions, remote.WithContext(ctx))
|
||||||
|
|
||||||
|
desc, err := is.getImageDescriptor(ref, contextOptions)
|
||||||
|
if err != nil {
|
||||||
|
return nil, nil, fmt.Errorf("获取镜像描述失败: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
var manifest map[string]interface{}
|
||||||
|
var repositories map[string]map[string]string
|
||||||
|
|
||||||
|
switch desc.MediaType {
|
||||||
|
case types.OCIImageIndex, types.DockerManifestList:
|
||||||
|
return nil, nil, fmt.Errorf("批量下载暂不支持多架构镜像")
|
||||||
|
case types.OCIManifestSchema1, types.DockerManifestSchema2:
|
||||||
|
img, err := desc.Image()
|
||||||
|
if err != nil {
|
||||||
|
return nil, nil, fmt.Errorf("获取镜像失败: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
layers, err := img.Layers()
|
||||||
|
if err != nil {
|
||||||
|
return nil, nil, fmt.Errorf("获取镜像层失败: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
configFile, err := img.ConfigFile()
|
||||||
|
if err != nil {
|
||||||
|
return nil, nil, fmt.Errorf("获取镜像配置失败: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
log.Printf("镜像包含 %d 层", len(layers))
|
||||||
|
|
||||||
|
err = is.streamDockerFormatWithReturn(ctx, tarWriter, img, layers, configFile, imageRef, &manifest, &repositories)
|
||||||
|
if err != nil {
|
||||||
|
return nil, nil, err
|
||||||
|
}
|
||||||
|
default:
|
||||||
|
img, err := desc.Image()
|
||||||
|
if err != nil {
|
||||||
|
return nil, nil, fmt.Errorf("获取镜像失败: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
layers, err := img.Layers()
|
||||||
|
if err != nil {
|
||||||
|
return nil, nil, fmt.Errorf("获取镜像层失败: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
configFile, err := img.ConfigFile()
|
||||||
|
if err != nil {
|
||||||
|
return nil, nil, fmt.Errorf("获取镜像配置失败: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
log.Printf("镜像包含 %d 层", len(layers))
|
||||||
|
|
||||||
|
err = is.streamDockerFormatWithReturn(ctx, tarWriter, img, layers, configFile, imageRef, &manifest, &repositories)
|
||||||
|
if err != nil {
|
||||||
|
return nil, nil, err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return manifest, repositories, nil
|
||||||
|
}
|
||||||
|
|
||||||
var globalImageStreamer *ImageStreamer
|
var globalImageStreamer *ImageStreamer
|
||||||
|
|
||||||
@@ -551,6 +663,10 @@ func (is *ImageStreamer) StreamMultipleImages(ctx context.Context, imageRefs []s
|
|||||||
tarWriter := tar.NewWriter(finalWriter)
|
tarWriter := tar.NewWriter(finalWriter)
|
||||||
defer tarWriter.Close()
|
defer tarWriter.Close()
|
||||||
|
|
||||||
|
var allManifests []map[string]interface{}
|
||||||
|
var allRepositories = make(map[string]map[string]string)
|
||||||
|
|
||||||
|
// 流式处理每个镜像
|
||||||
for i, imageRef := range imageRefs {
|
for i, imageRef := range imageRefs {
|
||||||
select {
|
select {
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
@@ -560,26 +676,66 @@ func (is *ImageStreamer) StreamMultipleImages(ctx context.Context, imageRefs []s
|
|||||||
|
|
||||||
log.Printf("处理镜像 %d/%d: %s", i+1, len(imageRefs), imageRef)
|
log.Printf("处理镜像 %d/%d: %s", i+1, len(imageRefs), imageRef)
|
||||||
|
|
||||||
dirName := fmt.Sprintf("image_%d_%s/", i, strings.ReplaceAll(imageRef, "/", "_"))
|
manifest, repositories, err := is.streamSingleImageForBatch(ctx, tarWriter, imageRef, options)
|
||||||
|
if err != nil {
|
||||||
dirHeader := &tar.Header{
|
log.Printf("下载镜像 %s 失败: %v", imageRef, err)
|
||||||
Name: dirName,
|
return fmt.Errorf("下载镜像 %s 失败: %w", imageRef, err)
|
||||||
Typeflag: tar.TypeDir,
|
|
||||||
Mode: 0755,
|
|
||||||
}
|
|
||||||
|
|
||||||
if err := tarWriter.WriteHeader(dirHeader); err != nil {
|
|
||||||
return fmt.Errorf("创建镜像目录失败: %w", err)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := is.StreamImageToWriter(ctx, imageRef, tarWriter, &StreamOptions{
|
// 收集manifest信息
|
||||||
Platform: options.Platform,
|
allManifests = append(allManifests, manifest)
|
||||||
Compression: false,
|
|
||||||
}); err != nil {
|
// 合并repositories信息
|
||||||
log.Printf("下载镜像 %s 失败: %v", imageRef, err)
|
for repo, tags := range repositories {
|
||||||
continue
|
if allRepositories[repo] == nil {
|
||||||
|
allRepositories[repo] = make(map[string]string)
|
||||||
|
}
|
||||||
|
for tag, digest := range tags {
|
||||||
|
allRepositories[repo][tag] = digest
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// 写入合并的manifest.json
|
||||||
|
manifestData, err := json.Marshal(allManifests)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("序列化manifest失败: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
manifestHeader := &tar.Header{
|
||||||
|
Name: "manifest.json",
|
||||||
|
Size: int64(len(manifestData)),
|
||||||
|
Mode: 0644,
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := tarWriter.WriteHeader(manifestHeader); err != nil {
|
||||||
|
return fmt.Errorf("写入manifest header失败: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if _, err := tarWriter.Write(manifestData); err != nil {
|
||||||
|
return fmt.Errorf("写入manifest数据失败: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// 写入合并的repositories文件
|
||||||
|
repositoriesData, err := json.Marshal(allRepositories)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("序列化repositories失败: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
repositoriesHeader := &tar.Header{
|
||||||
|
Name: "repositories",
|
||||||
|
Size: int64(len(repositoriesData)),
|
||||||
|
Mode: 0644,
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := tarWriter.WriteHeader(repositoriesHeader); err != nil {
|
||||||
|
return fmt.Errorf("写入repositories header失败: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if _, err := tarWriter.Write(repositoriesData); err != nil {
|
||||||
|
return fmt.Errorf("写入repositories数据失败: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
log.Printf("批量下载完成,共处理 %d 个镜像", len(imageRefs))
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
Reference in New Issue
Block a user