NodePipeline/NodePipeline.Engine/Execution/PipelineRegistry.cs
2026-01-02 20:55:25 +03:00

171 lines
7.7 KiB
C#

using System.Collections.Concurrent;
using System.Globalization;
using NodePipeline.Abstractions;
using NodePipeline.Abstractions.Exceptions;
using NodePipeline.Abstractions.Interfaces.Nodes;
using NodePipeline.Abstractions.Models.Validation;
using NodePipeline.Configuration.Abstractions.Models.Execute;
using NodePipeline.Engine.Abstractions;
using NodePipeline.Engine.Exceptions.PipelineRegistry.Construction;
using NodePipeline.Engine.Exceptions.PipelineRegistry.Registration;
using NodePipeline.Engine.Graph;
namespace NodePipeline.Engine.Execution;
public class PipelineRegistry(CultureInfo? cultureInfo)
{
private readonly ConcurrentDictionary<string, CachedPipeline> _cache = [];
private readonly CultureInfo _cultureInfo = cultureInfo ?? CultureInfo.InvariantCulture;
private void RegisterPipeline(INodeFactory nodeFactory, IPipelineNodeValidator nodeValidator, string pipelineId,
PipelineConfig config, out CachedPipeline cachedPipeline,
out Dictionary<string, NodeValidationResult> nodeValidationResults, CultureInfo cultureInfo)
{
if (string.IsNullOrWhiteSpace(pipelineId))
throw new NullPipelineIdException();
if (_cache.ContainsKey(pipelineId)) throw new PipelineAlreadyRegisteredException(pipelineId);
var duplicateIds = config.Nodes
.GroupBy(n => n.Id)
.Where(g => g.Count() > 1).Select(q => q.Key)
.ToList();
if (duplicateIds.Count > 0) throw new DuplicateNodeIdsException(pipelineId, duplicateIds);
var graph = PipelineGraphBuilder.Build(config);
if (CycleDetector.HasCycle(graph)) throw new CycleFoundException(pipelineId);
var order = TopologicalSorter.Sort(graph);
var nodeParameterCache = new Dictionary<string, Dictionary<string, object?>>();
var nodeConfigCache = new Dictionary<string, NodeConfig>();
nodeValidationResults = new Dictionary<string, NodeValidationResult>();
foreach (var nodeCfg in config.Nodes)
{
var paramDict = new Dictionary<string, object?>();
// var realParams = _nodeFactory.GetParameterCodes(nodeCfg.Type);
var realParams = nodeFactory.GetParameterCodes(nodeCfg.Type);
foreach (var paramCode in realParams)
if (nodeCfg.Parameters.TryGetValue(paramCode, out var rawValue))
// paramDict[paramCode] = _nodeFactory.ReadParameterValue(pipelineId, nodeCfg.Id, nodeCfg.Type, paramCode, rawValue);
paramDict[paramCode] =
nodeFactory.ReadParameterValue(pipelineId, nodeCfg.Id, nodeCfg.Type, paramCode, rawValue);
else
// paramDict[paramCode] = _nodeFactory.GetParameterDefaultValue(nodeCfg.Type, paramCode);
paramDict[paramCode] = nodeFactory.GetParameterDefaultValue(nodeCfg.Type, paramCode);
// var validationResult = _nodeValidator.ValidateNode(pipelineId, _nodeFactory, nodeCfg, config.Nodes, paramDict, cultureInfo);
var validationResult =
nodeValidator.ValidateNode(pipelineId, nodeFactory, nodeCfg, config.Nodes, paramDict, cultureInfo);
nodeValidationResults[nodeCfg.Id] = validationResult;
nodeParameterCache[nodeCfg.Id] = paramDict;
nodeConfigCache[nodeCfg.Id] = nodeCfg;
}
cachedPipeline = new CachedPipeline(nodeConfigCache, nodeParameterCache, order);
}
public bool TryRegister(INodeFactory nodeFactory, IPipelineNodeValidator nodeValidator, string pipelineId,
PipelineConfig config,
out Dictionary<string, NodeValidationResult> nodeValidationResults, bool treatWarningsAsErrors = false,
CultureInfo? cultureInfo = null)
{
cultureInfo ??= _cultureInfo;
// RegisterPipeline(pipelineId, config, out var cachedPipeline, out nodeValidationResults, cultureInfo);
RegisterPipeline(nodeFactory, nodeValidator, pipelineId, config, out var cachedPipeline,
out nodeValidationResults, cultureInfo);
var result = ValidatePipeline(pipelineId, nodeValidationResults, treatWarningsAsErrors, false);
if (result.IsRegistered) _cache[pipelineId] = cachedPipeline;
return result.IsRegistered;
}
public PipelineRegistrationResult Register(INodeFactory nodeFactory, IPipelineNodeValidator nodeValidator,
string pipelineId, PipelineConfig config, bool treatWarningsAsErrors = false,
CultureInfo? cultureInfo = null)
{
cultureInfo ??= _cultureInfo;
// RegisterPipeline(pipelineId, config, out var cachedPipeline, out var validationResults, cultureInfo);
RegisterPipeline(nodeFactory, nodeValidator, pipelineId, config, out var cachedPipeline,
out var validationResults, cultureInfo);
var result = ValidatePipeline(pipelineId, validationResults, treatWarningsAsErrors);
_cache[pipelineId] = cachedPipeline;
return result;
}
private static PipelineRegistrationResult ValidatePipeline(string pipelineId,
Dictionary<string, NodeValidationResult> nodeValidationResults,
bool treatWarningsAsErrors, bool throwException = true)
{
var hasErrors = nodeValidationResults.Any(q => !IsValid(q.Value.Result, treatWarningsAsErrors));
if (hasErrors && throwException)
throw new PipelineValidationException(pipelineId, nodeValidationResults, treatWarningsAsErrors);
return new PipelineRegistrationResult(pipelineId, !hasErrors, nodeValidationResults, treatWarningsAsErrors);
}
private static bool IsValid(ValidationResult validationResult, bool treatWarningsAsErrors)
{
return validationResult == ValidationResult.Valid
|| (!treatWarningsAsErrors && validationResult == ValidationResult.HasWarnings);
}
public bool Remove(string name)
{
return _cache.TryRemove(name, out _);
}
public bool Contains(string name)
{
return _cache.ContainsKey(name);
}
public IEnumerable<string> ListNames()
{
return _cache.Keys;
}
public PipelineExecutor Build(INodeFactory nodeFactory, string pipelineId)
{
if (!_cache.TryGetValue(pipelineId, out var cached)) throw new PipelineNotFoundException(pipelineId);
// if (_nodeFactory == null) throw new NodeFactoryNotRegisteredException();
var createdNodes = new Dictionary<string, INode>();
var executeDelegates = new List<Action>();
foreach (var nodeId in cached.TopologicalOrder)
{
var nodeCfg = cached.NodeConfigCache[nodeId];
// var node = _nodeFactory.CreateNode(pipelineId, nodeCfg);
var node = nodeFactory.CreateNode(pipelineId, nodeCfg);
// _nodeFactory.SetNodeParametersValues(pipelineId, node, nodeCfg.Id, cached.NodeParameterCache[nodeId]);
nodeFactory.SetNodeParametersValues(pipelineId, node, nodeCfg.Id, cached.NodeParameterCache[nodeId]);
createdNodes[nodeId] = node;
executeDelegates.Add(node.Execute);
}
foreach (var nodeId in cached.TopologicalOrder)
{
var nodeCfg = cached.NodeConfigCache[nodeId];
if (nodeCfg.Inputs.Count == 0) continue;
var node = createdNodes[nodeId];
// _nodeFactory.ConnectPorts(pipelineId, node, nodeId, nodeCfg.Inputs, createdNodes);
nodeFactory.ConnectPorts(pipelineId, node, nodeId, nodeCfg.Inputs, createdNodes);
}
var executor = new PipelineExecutor();
foreach (var step in executeDelegates)
executor.AddStep(step);
return executor;
}
private record CachedPipeline(
Dictionary<string, NodeConfig> NodeConfigCache,
Dictionary<string, Dictionary<string, object?>> NodeParameterCache,
List<string> TopologicalOrder
);
}