using System.Collections.Concurrent; using System.Globalization; using NodePipeline.Abstractions; using NodePipeline.Abstractions.Exceptions; using NodePipeline.Abstractions.Interfaces.Nodes; using NodePipeline.Abstractions.Models.Validation; using NodePipeline.Configuration.Abstractions.Models.Execute; using NodePipeline.Engine.Abstractions; using NodePipeline.Engine.Exceptions.PipelineRegistry.Construction; using NodePipeline.Engine.Exceptions.PipelineRegistry.Registration; using NodePipeline.Engine.Graph; namespace NodePipeline.Engine.Execution; public class PipelineRegistry(CultureInfo? cultureInfo) { private readonly ConcurrentDictionary _cache = []; private readonly CultureInfo _cultureInfo = cultureInfo ?? CultureInfo.InvariantCulture; private void RegisterPipeline(INodeFactory nodeFactory, IPipelineNodeValidator nodeValidator, string pipelineId, PipelineConfig config, out CachedPipeline cachedPipeline, out Dictionary nodeValidationResults, CultureInfo cultureInfo) { if (string.IsNullOrWhiteSpace(pipelineId)) throw new NullPipelineIdException(); if (_cache.ContainsKey(pipelineId)) throw new PipelineAlreadyRegisteredException(pipelineId); var duplicateIds = config.Nodes .GroupBy(n => n.Id) .Where(g => g.Count() > 1).Select(q => q.Key) .ToList(); if (duplicateIds.Count > 0) throw new DuplicateNodeIdsException(pipelineId, duplicateIds); var graph = PipelineGraphBuilder.Build(config); if (CycleDetector.HasCycle(graph)) throw new CycleFoundException(pipelineId); var order = TopologicalSorter.Sort(graph); var nodeParameterCache = new Dictionary>(); var nodeConfigCache = new Dictionary(); nodeValidationResults = new Dictionary(); foreach (var nodeCfg in config.Nodes) { var paramDict = new Dictionary(); // var realParams = _nodeFactory.GetParameterCodes(nodeCfg.Type); var realParams = nodeFactory.GetParameterCodes(nodeCfg.Type); foreach (var paramCode in realParams) if (nodeCfg.Parameters.TryGetValue(paramCode, out var rawValue)) // paramDict[paramCode] = _nodeFactory.ReadParameterValue(pipelineId, nodeCfg.Id, nodeCfg.Type, paramCode, rawValue); paramDict[paramCode] = nodeFactory.ReadParameterValue(pipelineId, nodeCfg.Id, nodeCfg.Type, paramCode, rawValue); else // paramDict[paramCode] = _nodeFactory.GetParameterDefaultValue(nodeCfg.Type, paramCode); paramDict[paramCode] = nodeFactory.GetParameterDefaultValue(nodeCfg.Type, paramCode); // var validationResult = _nodeValidator.ValidateNode(pipelineId, _nodeFactory, nodeCfg, config.Nodes, paramDict, cultureInfo); var validationResult = nodeValidator.ValidateNode(pipelineId, nodeFactory, nodeCfg, config.Nodes, paramDict, cultureInfo); nodeValidationResults[nodeCfg.Id] = validationResult; nodeParameterCache[nodeCfg.Id] = paramDict; nodeConfigCache[nodeCfg.Id] = nodeCfg; } cachedPipeline = new CachedPipeline(nodeConfigCache, nodeParameterCache, order); } public bool TryRegister(INodeFactory nodeFactory, IPipelineNodeValidator nodeValidator, string pipelineId, PipelineConfig config, out Dictionary nodeValidationResults, bool treatWarningsAsErrors = false, CultureInfo? cultureInfo = null) { cultureInfo ??= _cultureInfo; // RegisterPipeline(pipelineId, config, out var cachedPipeline, out nodeValidationResults, cultureInfo); RegisterPipeline(nodeFactory, nodeValidator, pipelineId, config, out var cachedPipeline, out nodeValidationResults, cultureInfo); var result = ValidatePipeline(pipelineId, nodeValidationResults, treatWarningsAsErrors, false); if (result.IsRegistered) _cache[pipelineId] = cachedPipeline; return result.IsRegistered; } public PipelineRegistrationResult Register(INodeFactory nodeFactory, IPipelineNodeValidator nodeValidator, string pipelineId, PipelineConfig config, bool treatWarningsAsErrors = false, CultureInfo? cultureInfo = null) { cultureInfo ??= _cultureInfo; // RegisterPipeline(pipelineId, config, out var cachedPipeline, out var validationResults, cultureInfo); RegisterPipeline(nodeFactory, nodeValidator, pipelineId, config, out var cachedPipeline, out var validationResults, cultureInfo); var result = ValidatePipeline(pipelineId, validationResults, treatWarningsAsErrors); _cache[pipelineId] = cachedPipeline; return result; } private static PipelineRegistrationResult ValidatePipeline(string pipelineId, Dictionary nodeValidationResults, bool treatWarningsAsErrors, bool throwException = true) { var hasErrors = nodeValidationResults.Any(q => !IsValid(q.Value.Result, treatWarningsAsErrors)); if (hasErrors && throwException) throw new PipelineValidationException(pipelineId, nodeValidationResults, treatWarningsAsErrors); return new PipelineRegistrationResult(pipelineId, !hasErrors, nodeValidationResults, treatWarningsAsErrors); } private static bool IsValid(ValidationResult validationResult, bool treatWarningsAsErrors) { return validationResult == ValidationResult.Valid || (!treatWarningsAsErrors && validationResult == ValidationResult.HasWarnings); } public bool Remove(string name) { return _cache.TryRemove(name, out _); } public bool Contains(string name) { return _cache.ContainsKey(name); } public IEnumerable ListNames() { return _cache.Keys; } public PipelineExecutor Build(INodeFactory nodeFactory, string pipelineId) { if (!_cache.TryGetValue(pipelineId, out var cached)) throw new PipelineNotFoundException(pipelineId); // if (_nodeFactory == null) throw new NodeFactoryNotRegisteredException(); var createdNodes = new Dictionary(); var executeDelegates = new List(); foreach (var nodeId in cached.TopologicalOrder) { var nodeCfg = cached.NodeConfigCache[nodeId]; // var node = _nodeFactory.CreateNode(pipelineId, nodeCfg); var node = nodeFactory.CreateNode(pipelineId, nodeCfg); // _nodeFactory.SetNodeParametersValues(pipelineId, node, nodeCfg.Id, cached.NodeParameterCache[nodeId]); nodeFactory.SetNodeParametersValues(pipelineId, node, nodeCfg.Id, cached.NodeParameterCache[nodeId]); createdNodes[nodeId] = node; executeDelegates.Add(node.Execute); } foreach (var nodeId in cached.TopologicalOrder) { var nodeCfg = cached.NodeConfigCache[nodeId]; if (nodeCfg.Inputs.Count == 0) continue; var node = createdNodes[nodeId]; // _nodeFactory.ConnectPorts(pipelineId, node, nodeId, nodeCfg.Inputs, createdNodes); nodeFactory.ConnectPorts(pipelineId, node, nodeId, nodeCfg.Inputs, createdNodes); } var executor = new PipelineExecutor(); foreach (var step in executeDelegates) executor.AddStep(step); return executor; } private record CachedPipeline( Dictionary NodeConfigCache, Dictionary> NodeParameterCache, List TopologicalOrder ); }