Skip to content

Commit

Permalink
Download assets.
Browse files Browse the repository at this point in the history
  • Loading branch information
SebastianStehle committed Oct 21, 2021
1 parent 239f6f5 commit 625ba81
Show file tree
Hide file tree
Showing 11 changed files with 229 additions and 49 deletions.
74 changes: 72 additions & 2 deletions cli/Squidex.CLI/Squidex.CLI/Commands/App_Assets.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
// ==========================================================================

using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Threading.Tasks;
Expand Down Expand Up @@ -74,15 +76,15 @@ public async Task Import(ImportArguments arguments)
{
var existing = existings.Items.First();

log.WriteLine($"Updating: {file.FullName}");
log.WriteLine($"Uploading: {file.FullName}");

await assets.PutAssetContentAsync(session.App, existing.Id, fileParameter);

log.StepSuccess();
}
else
{
log.WriteLine($"Uploading: {file.FullName}");
log.WriteLine($"Uploading New: {file.FullName}");

var result = await assets.PostAssetAsync(session.App, parentId, duplicate: arguments.Duplicate, file: fileParameter);

Expand Down Expand Up @@ -110,6 +112,56 @@ public async Task Import(ImportArguments arguments)
}
}

[Command(Name = "export", Description = "Export all files to the source folder.")]
public async Task Export(ImportArguments arguments)
{
var session = configuration.StartSession();

var assets = session.Assets;

using (var fs = FileSystems.Create(arguments.Path))
{
var folderTree = new FolderTree(session);
var folderNames = new HashSet<string>();

var parentId = await folderTree.GetIdAsync(arguments.TargetFolder);

var downloadPipeline = new DownloadPipeline(session, log, fs)
{
FilePathProviderAsync = async asset =>
{
var assetFolder = await folderTree.GetPathAsync(asset.ParentId);
var assetPath = asset.FileName;

if (!string.IsNullOrWhiteSpace(assetFolder))
{
assetPath = Path.Combine(assetFolder, assetPath);
}

if (!folderNames.Add(assetPath))
{
assetPath = Path.Combine(assetFolder, $"{asset.Id}_{asset.FileName}");
}

return FilePath.Create(assetPath);
}
};

await assets.GetAllByQueryAsync(session.App, async asset =>
{
await downloadPipeline.DownloadAsync(asset);
},
new AssetQuery
{
ParentId = parentId
});

await downloadPipeline.CompleteAsync();

log.WriteLine("> Export completed");
}
}

[Validator(typeof(Validator))]
public sealed class ImportArguments : IArgumentModel
{
Expand All @@ -130,6 +182,24 @@ public Validator()
}
}
}

[Validator(typeof(Validator))]
public sealed class ExportArguments : IArgumentModel
{
[Operand(Name = "folder", Description = "The source folder.")]
public string Path { get; set; }

[Option(ShortName = "t", LongName = "target", Description = "Path to the target folder.")]
public string SourceFolder { get; set; }

public sealed class Validator : AbstractValidator<ExportArguments>
{
public Validator()
{
RuleFor(x => x.Path).NotEmpty();
}
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
// All rights reserved. Licensed under the MIT license.
// ==========================================================================

using System.Collections.Generic;
using System.IO;
using System.Linq;

Expand All @@ -15,21 +14,26 @@ public sealed class FilePath
{
public static readonly FilePath Root = new FilePath(string.Empty);

public IEnumerable<string> Elements { get; }
public string[] Elements { get; }

public FilePath(params string[] elements)
{
Elements = elements;
}

public static FilePath Create(string path)
{
return new FilePath(path.Split('/', '\\'));
}

public FilePath Combine(FilePath path)
{
return new FilePath(Elements.Concat(path.Elements).ToArray());
}

public override string ToString()
{
return Path.Combine(Elements.ToArray());
return Path.Combine(Elements);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,10 @@ public Task CleanupAsync(IFileSystem fs)

public async Task ExportAsync(ISyncService sync, SyncOptions options, ISession session)
{
var downloadPipeline = new DownloadPipeline(session, log, sync.FileSystem);
var downloadPipeline = new DownloadPipeline(session, log, sync.FileSystem)
{
FilePathProvider = asset => asset.Id.GetBlobPath()
};

var assets = new List<AssetModel>();
var assetBatch = 0;
Expand Down Expand Up @@ -95,7 +98,10 @@ public async Task ImportAsync(ISyncService sync, SyncOptions options, ISession s
{
if (model?.Assets?.Count > 0)
{
var uploader = new UploadPipeline(session, log, sync.FileSystem);
var uploader = new UploadPipeline(session, log, sync.FileSystem)
{
FilePathProvider = asset => asset.Id.GetBlobPath()
};

await uploader.UploadAsync(model.Assets);
await uploader.CompleteAsync();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,50 @@ namespace Squidex.CLI.Commands.Implementation.Sync.Assets
{
public sealed class DownloadPipeline
{
private readonly ActionBlock<AssetDto> pipeline;
private readonly ITargetBlock<AssetDto> pipelineStart;
private readonly IDataflowBlock pipelineEnd;

public Func<AssetDto, FilePath> FilePathProvider { get; set; }

public Func<AssetDto, Task<FilePath>> FilePathProviderAsync { get; set; }

public DownloadPipeline(ISession session, ILogger log, IFileSystem fs)
{
pipeline = new ActionBlock<AssetDto>(async asset =>
var fileNameStep = new TransformBlock<AssetDto, (AssetDto, FilePath)>(async asset =>
{
FilePath path;

if (FilePathProvider != null)
{
path = FilePathProvider(asset);
}
else if (FilePathProviderAsync != null)
{
path = await FilePathProviderAsync(asset);
}
else
{
path = new FilePath(asset.Id);
}

return (asset, path);
},
new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = 1,
MaxMessagesPerTask = 1,
BoundedCapacity = 1
});

var downloadStep = new ActionBlock<(AssetDto, FilePath)>(async item =>
{
var process = $"Downloading {asset.Id}";
var (asset, path) = item;

var process = $"Downloading {path}";

try
{
var assetFile = fs.GetBlobFile(asset.Id);
var assetFile = fs.GetFile(path);
var assetHash = GetFileHash(assetFile, asset);

if (assetHash == null || !string.Equals(asset.FileHash, assetHash))
Expand All @@ -36,9 +69,9 @@ public DownloadPipeline(ISession session, ILogger log, IFileSystem fs)

await using (response.Stream)
{
await using (var fileStream = assetFile.OpenWrite())
await using (var stream = assetFile.OpenWrite())
{
await response.Stream.CopyToAsync(fileStream);
await response.Stream.CopyToAsync(stream);
}
}

Expand All @@ -59,6 +92,14 @@ public DownloadPipeline(ISession session, ILogger log, IFileSystem fs)
MaxMessagesPerTask = 1,
BoundedCapacity = 16
});

fileNameStep.LinkTo(downloadStep, new DataflowLinkOptions
{
PropagateCompletion = true
});

pipelineStart = fileNameStep;
pipelineEnd = downloadStep;
}

private static string GetFileHash(IFile file, AssetDto asset)
Expand Down Expand Up @@ -101,14 +142,14 @@ private static string GetFileHash(IFile file, AssetDto asset)

public Task DownloadAsync(AssetDto asset)
{
return pipeline.SendAsync(asset);
return pipelineStart.SendAsync(asset);
}

public Task CompleteAsync()
{
pipeline.Complete();
pipelineEnd.Complete();

return pipeline.Completion;
return pipelineEnd.Completion;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,12 @@ public static class Extensions
{
public static IFile GetBlobFile(this IFileSystem fs, string id)
{
return fs.GetFile(new FilePath("assets", "files", $"{id}.blob"));
return fs.GetFile(GetBlobPath(id));
}

public static FilePath GetBlobPath(this string id)
{
return new FilePath("assets", "files", $"{id}.blob");
}

public static BulkUpdateAssetsJobDto ToMoveJob(this AssetModel model, string parentId)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,52 @@ namespace Squidex.CLI.Commands.Implementation.Sync.Assets
{
public sealed class UploadPipeline
{
private readonly ActionBlock<AssetModel> pipeline;
private readonly ITargetBlock<AssetModel> pipelineStart;
private readonly IDataflowBlock pipelineEnd;

public Func<AssetModel, FilePath> FilePathProvider { get; set; }

public Func<AssetModel, Task<FilePath>> FilePathProviderAsync { get; set; }

public UploadPipeline(ISession session, ILogger log, IFileSystem fs)
{
var tree = new FolderTree(session);

pipeline = new ActionBlock<AssetModel>(async asset =>
var fileNameStep = new TransformBlock<AssetModel, (AssetModel, FilePath)>(async asset =>
{
FilePath path;

if (FilePathProvider != null)
{
path = FilePathProvider(asset);
}
else if (FilePathProviderAsync != null)
{
path = await FilePathProviderAsync(asset);
}
else
{
path = new FilePath(asset.Id);
}

return (asset, path);
},
new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = 1,
MaxMessagesPerTask = 1,
BoundedCapacity = 1
});

var uploadStep = new ActionBlock<(AssetModel, FilePath)>(async item =>
{
var process = $"Uploading {asset.Id}";
var (asset, path) = item;

var process = $"Uploading {path}";

try
{
var assetFile = fs.GetBlobFile(asset.Id);
var assetFile = fs.GetFile(path);

await using (var stream = assetFile.OpenRead())
{
Expand Down Expand Up @@ -56,21 +89,29 @@ public UploadPipeline(ISession session, ILogger log, IFileSystem fs)
MaxMessagesPerTask = 1,
BoundedCapacity = 16
});

fileNameStep.LinkTo(uploadStep, new DataflowLinkOptions
{
PropagateCompletion = true
});

pipelineStart = fileNameStep;
pipelineEnd = uploadStep;
}

public async Task UploadAsync(IEnumerable<AssetModel> assets)
{
foreach (var asset in assets)
{
await pipeline.SendAsync(asset);
await pipelineStart.SendAsync(asset);
}
}

public Task CompleteAsync()
{
pipeline.Complete();
pipelineEnd.Complete();

return pipeline.Completion;
return pipelineEnd.Completion;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ public sealed class ContentsSynchronizer : ISynchronizer
private const string Ref = "../__json/contents";
private readonly ILogger log;

public string Name => "contents";
public string Name => "Contents";

public ContentsSynchronizer(ILogger log)
{
Expand Down
Loading

0 comments on commit 625ba81

Please sign in to comment.