171 lines
7.7 KiB
C#
171 lines
7.7 KiB
C#
using System.Collections.Concurrent;
|
|
using System.Globalization;
|
|
using NodePipeline.Abstractions;
|
|
using NodePipeline.Abstractions.Exceptions;
|
|
using NodePipeline.Abstractions.Interfaces.Nodes;
|
|
using NodePipeline.Abstractions.Models.Validation;
|
|
using NodePipeline.Configuration.Abstractions.Models.Execute;
|
|
using NodePipeline.Engine.Abstractions;
|
|
using NodePipeline.Engine.Exceptions.PipelineRegistry.Construction;
|
|
using NodePipeline.Engine.Exceptions.PipelineRegistry.Registration;
|
|
using NodePipeline.Engine.Graph;
|
|
|
|
namespace NodePipeline.Engine.Execution;
|
|
|
|
public class PipelineRegistry(CultureInfo? cultureInfo)
|
|
{
|
|
private readonly ConcurrentDictionary<string, CachedPipeline> _cache = [];
|
|
private readonly CultureInfo _cultureInfo = cultureInfo ?? CultureInfo.InvariantCulture;
|
|
|
|
|
|
private void RegisterPipeline(INodeFactory nodeFactory, IPipelineNodeValidator nodeValidator, string pipelineId,
|
|
PipelineConfig config, out CachedPipeline cachedPipeline,
|
|
out Dictionary<string, NodeValidationResult> nodeValidationResults, CultureInfo cultureInfo)
|
|
{
|
|
if (string.IsNullOrWhiteSpace(pipelineId))
|
|
throw new NullPipelineIdException();
|
|
|
|
if (_cache.ContainsKey(pipelineId)) throw new PipelineAlreadyRegisteredException(pipelineId);
|
|
|
|
var duplicateIds = config.Nodes
|
|
.GroupBy(n => n.Id)
|
|
.Where(g => g.Count() > 1).Select(q => q.Key)
|
|
.ToList();
|
|
|
|
if (duplicateIds.Count > 0) throw new DuplicateNodeIdsException(pipelineId, duplicateIds);
|
|
|
|
var graph = PipelineGraphBuilder.Build(config);
|
|
if (CycleDetector.HasCycle(graph)) throw new CycleFoundException(pipelineId);
|
|
|
|
var order = TopologicalSorter.Sort(graph);
|
|
|
|
var nodeParameterCache = new Dictionary<string, Dictionary<string, object?>>();
|
|
var nodeConfigCache = new Dictionary<string, NodeConfig>();
|
|
|
|
nodeValidationResults = new Dictionary<string, NodeValidationResult>();
|
|
|
|
foreach (var nodeCfg in config.Nodes)
|
|
{
|
|
var paramDict = new Dictionary<string, object?>();
|
|
// var realParams = _nodeFactory.GetParameterCodes(nodeCfg.Type);
|
|
var realParams = nodeFactory.GetParameterCodes(nodeCfg.Type);
|
|
foreach (var paramCode in realParams)
|
|
if (nodeCfg.Parameters.TryGetValue(paramCode, out var rawValue))
|
|
// paramDict[paramCode] = _nodeFactory.ReadParameterValue(pipelineId, nodeCfg.Id, nodeCfg.Type, paramCode, rawValue);
|
|
paramDict[paramCode] =
|
|
nodeFactory.ReadParameterValue(pipelineId, nodeCfg.Id, nodeCfg.Type, paramCode, rawValue);
|
|
else
|
|
// paramDict[paramCode] = _nodeFactory.GetParameterDefaultValue(nodeCfg.Type, paramCode);
|
|
paramDict[paramCode] = nodeFactory.GetParameterDefaultValue(nodeCfg.Type, paramCode);
|
|
|
|
// var validationResult = _nodeValidator.ValidateNode(pipelineId, _nodeFactory, nodeCfg, config.Nodes, paramDict, cultureInfo);
|
|
var validationResult =
|
|
nodeValidator.ValidateNode(pipelineId, nodeFactory, nodeCfg, config.Nodes, paramDict, cultureInfo);
|
|
nodeValidationResults[nodeCfg.Id] = validationResult;
|
|
|
|
nodeParameterCache[nodeCfg.Id] = paramDict;
|
|
nodeConfigCache[nodeCfg.Id] = nodeCfg;
|
|
}
|
|
|
|
cachedPipeline = new CachedPipeline(nodeConfigCache, nodeParameterCache, order);
|
|
}
|
|
|
|
public bool TryRegister(INodeFactory nodeFactory, IPipelineNodeValidator nodeValidator, string pipelineId,
|
|
PipelineConfig config,
|
|
out Dictionary<string, NodeValidationResult> nodeValidationResults, bool treatWarningsAsErrors = false,
|
|
CultureInfo? cultureInfo = null)
|
|
{
|
|
cultureInfo ??= _cultureInfo;
|
|
// RegisterPipeline(pipelineId, config, out var cachedPipeline, out nodeValidationResults, cultureInfo);
|
|
RegisterPipeline(nodeFactory, nodeValidator, pipelineId, config, out var cachedPipeline,
|
|
out nodeValidationResults, cultureInfo);
|
|
var result = ValidatePipeline(pipelineId, nodeValidationResults, treatWarningsAsErrors, false);
|
|
if (result.IsRegistered) _cache[pipelineId] = cachedPipeline;
|
|
return result.IsRegistered;
|
|
}
|
|
|
|
public PipelineRegistrationResult Register(INodeFactory nodeFactory, IPipelineNodeValidator nodeValidator,
|
|
string pipelineId, PipelineConfig config, bool treatWarningsAsErrors = false,
|
|
CultureInfo? cultureInfo = null)
|
|
{
|
|
cultureInfo ??= _cultureInfo;
|
|
// RegisterPipeline(pipelineId, config, out var cachedPipeline, out var validationResults, cultureInfo);
|
|
RegisterPipeline(nodeFactory, nodeValidator, pipelineId, config, out var cachedPipeline,
|
|
out var validationResults, cultureInfo);
|
|
var result = ValidatePipeline(pipelineId, validationResults, treatWarningsAsErrors);
|
|
_cache[pipelineId] = cachedPipeline;
|
|
return result;
|
|
}
|
|
|
|
private static PipelineRegistrationResult ValidatePipeline(string pipelineId,
|
|
Dictionary<string, NodeValidationResult> nodeValidationResults,
|
|
bool treatWarningsAsErrors, bool throwException = true)
|
|
{
|
|
var hasErrors = nodeValidationResults.Any(q => !IsValid(q.Value.Result, treatWarningsAsErrors));
|
|
if (hasErrors && throwException)
|
|
throw new PipelineValidationException(pipelineId, nodeValidationResults, treatWarningsAsErrors);
|
|
|
|
return new PipelineRegistrationResult(pipelineId, !hasErrors, nodeValidationResults, treatWarningsAsErrors);
|
|
}
|
|
|
|
private static bool IsValid(ValidationResult validationResult, bool treatWarningsAsErrors)
|
|
{
|
|
return validationResult == ValidationResult.Valid
|
|
|| (!treatWarningsAsErrors && validationResult == ValidationResult.HasWarnings);
|
|
}
|
|
|
|
public bool Remove(string name)
|
|
{
|
|
return _cache.TryRemove(name, out _);
|
|
}
|
|
|
|
public bool Contains(string name)
|
|
{
|
|
return _cache.ContainsKey(name);
|
|
}
|
|
|
|
public IEnumerable<string> ListNames()
|
|
{
|
|
return _cache.Keys;
|
|
}
|
|
|
|
public PipelineExecutor Build(INodeFactory nodeFactory, string pipelineId)
|
|
{
|
|
if (!_cache.TryGetValue(pipelineId, out var cached)) throw new PipelineNotFoundException(pipelineId);
|
|
// if (_nodeFactory == null) throw new NodeFactoryNotRegisteredException();
|
|
|
|
var createdNodes = new Dictionary<string, INode>();
|
|
var executeDelegates = new List<Action>();
|
|
foreach (var nodeId in cached.TopologicalOrder)
|
|
{
|
|
var nodeCfg = cached.NodeConfigCache[nodeId];
|
|
// var node = _nodeFactory.CreateNode(pipelineId, nodeCfg);
|
|
var node = nodeFactory.CreateNode(pipelineId, nodeCfg);
|
|
// _nodeFactory.SetNodeParametersValues(pipelineId, node, nodeCfg.Id, cached.NodeParameterCache[nodeId]);
|
|
nodeFactory.SetNodeParametersValues(pipelineId, node, nodeCfg.Id, cached.NodeParameterCache[nodeId]);
|
|
createdNodes[nodeId] = node;
|
|
executeDelegates.Add(node.Execute);
|
|
}
|
|
|
|
foreach (var nodeId in cached.TopologicalOrder)
|
|
{
|
|
var nodeCfg = cached.NodeConfigCache[nodeId];
|
|
if (nodeCfg.Inputs.Count == 0) continue;
|
|
var node = createdNodes[nodeId];
|
|
// _nodeFactory.ConnectPorts(pipelineId, node, nodeId, nodeCfg.Inputs, createdNodes);
|
|
nodeFactory.ConnectPorts(pipelineId, node, nodeId, nodeCfg.Inputs, createdNodes);
|
|
}
|
|
|
|
var executor = new PipelineExecutor();
|
|
foreach (var step in executeDelegates)
|
|
executor.AddStep(step);
|
|
|
|
return executor;
|
|
}
|
|
|
|
private record CachedPipeline(
|
|
Dictionary<string, NodeConfig> NodeConfigCache,
|
|
Dictionary<string, Dictionary<string, object?>> NodeParameterCache,
|
|
List<string> TopologicalOrder
|
|
);
|
|
} |