// Licensed to the .NET Foundation under one or more agreements. // The .NET Foundation licenses this file to you under the MIT license. using System; using System.Collections.Generic; using System.Diagnostics; using System.IO; using System.Runtime.CompilerServices; using System.Threading; using System.Threading.Tasks; using Microsoft.Extensions.Logging; using Microsoft.Shared.Diagnostics; using static Microsoft.Extensions.DataIngestion.DiagnosticsConstants; namespace Microsoft.Extensions.DataIngestion; #pragma warning disable IDE0058 // Expression value is never used #pragma warning disable IDE0063 // Use simple 'using' statement #pragma warning disable CA1031 // Do not catch general exception types /// /// Represents a pipeline for ingesting data from documents and processing it into chunks. /// /// The type of the chunk content. public sealed class IngestionPipeline : IDisposable { private readonly IngestionDocumentReader _reader; private readonly IngestionChunker _chunker; private readonly IngestionChunkWriter _writer; private readonly ActivitySource _activitySource; private readonly ILogger? _logger; /// /// Initializes a new instance of the class. /// /// The reader for ingestion documents. /// The chunker to split documents into chunks. /// The writer for processing chunks. /// The options for the ingestion pipeline. /// The logger factory for creating loggers. public IngestionPipeline( IngestionDocumentReader reader, IngestionChunker chunker, IngestionChunkWriter writer, IngestionPipelineOptions? options = default, ILoggerFactory? loggerFactory = default) { _reader = Throw.IfNull(reader); _chunker = Throw.IfNull(chunker); _writer = Throw.IfNull(writer); _activitySource = new((options ?? new()).ActivitySourceName); _logger = loggerFactory?.CreateLogger>(); } /// public void Dispose() { _writer.Dispose(); _activitySource.Dispose(); } /// /// Gets the document processors in the pipeline. /// public IList DocumentProcessors { get; } = []; /// /// Gets the chunk processors in the pipeline. /// public IList> ChunkProcessors { get; } = []; /// /// Processes all files in the specified directory that match the given search pattern and option. /// /// The directory to process. /// The search pattern for file selection. /// The search option for directory traversal. /// The cancellation token for the operation. /// A task representing the asynchronous operation. public async IAsyncEnumerable ProcessAsync(DirectoryInfo directory, string searchPattern = "*.*", SearchOption searchOption = SearchOption.TopDirectoryOnly, [EnumeratorCancellation] CancellationToken cancellationToken = default) { Throw.IfNull(directory); Throw.IfNullOrEmpty(searchPattern); Throw.IfOutOfRange((int)searchOption, (int)SearchOption.TopDirectoryOnly, (int)SearchOption.AllDirectories); using (Activity? rootActivity = _activitySource.StartActivity(ProcessDirectory.ActivityName)) { rootActivity?.SetTag(ProcessDirectory.DirectoryPathTagName, directory.FullName) .SetTag(ProcessDirectory.SearchPatternTagName, searchPattern) .SetTag(ProcessDirectory.SearchOptionTagName, searchOption.ToString()); _logger?.ProcessingDirectory(directory.FullName, searchPattern, searchOption); await foreach (var ingestionResult in ProcessAsync(directory.EnumerateFiles(searchPattern, searchOption), rootActivity, cancellationToken).ConfigureAwait(false)) { yield return ingestionResult; } } } /// /// Processes the specified files. /// /// The collection of files to process. /// The cancellation token for the operation. /// A task representing the asynchronous operation. public async IAsyncEnumerable ProcessAsync(IEnumerable files, [EnumeratorCancellation] CancellationToken cancellationToken = default) { Throw.IfNull(files); using (Activity? rootActivity = _activitySource.StartActivity(ProcessFiles.ActivityName)) { await foreach (var ingestionResult in ProcessAsync(files, rootActivity, cancellationToken).ConfigureAwait(false)) { yield return ingestionResult; } } } private static string GetShortName(object any) => any.GetType().Name; private static void TraceException(Activity? activity, Exception ex) { activity?.SetTag(ErrorTypeTagName, ex.GetType().FullName) .SetStatus(ActivityStatusCode.Error, ex.Message); } private async IAsyncEnumerable ProcessAsync(IEnumerable files, Activity? rootActivity, [EnumeratorCancellation] CancellationToken cancellationToken) { #if NET if (System.Linq.Enumerable.TryGetNonEnumeratedCount(files, out int count)) #else if (files is IReadOnlyCollection { Count: int count }) #endif { rootActivity?.SetTag(ProcessFiles.FileCountTagName, count); _logger?.LogFileCount(count); } foreach (FileInfo fileInfo in files) { using (Activity? processFileActivity = _activitySource.StartActivity(ProcessFile.ActivityName, ActivityKind.Internal, parentContext: rootActivity?.Context ?? default)) { processFileActivity?.SetTag(ProcessFile.FilePathTagName, fileInfo.FullName); _logger?.ReadingFile(fileInfo.FullName, GetShortName(_reader)); IngestionDocument? document = null; Exception? failure = null; try { document = await _reader.ReadAsync(fileInfo, cancellationToken).ConfigureAwait(false); processFileActivity?.SetTag(ProcessSource.DocumentIdTagName, document.Identifier); _logger?.ReadDocument(document.Identifier); document = await IngestAsync(document, processFileActivity, cancellationToken).ConfigureAwait(false); } catch (Exception ex) { TraceException(processFileActivity, ex); _logger?.IngestingFailed(ex, document?.Identifier ?? fileInfo.FullName); failure = ex; } string documentId = document?.Identifier ?? fileInfo.FullName; yield return new IngestionResult(documentId, document, failure); } } } private async Task IngestAsync(IngestionDocument document, Activity? parentActivity, CancellationToken cancellationToken) { foreach (IngestionDocumentProcessor processor in DocumentProcessors) { document = await processor.ProcessAsync(document, cancellationToken).ConfigureAwait(false); // A DocumentProcessor might change the document identifier (for example by extracting it from its content), so update the ID tag. parentActivity?.SetTag(ProcessSource.DocumentIdTagName, document.Identifier); } IAsyncEnumerable> chunks = _chunker.ProcessAsync(document, cancellationToken); foreach (var processor in ChunkProcessors) { chunks = processor.ProcessAsync(chunks, cancellationToken); } _logger?.WritingChunks(GetShortName(_writer)); await _writer.WriteAsync(chunks, cancellationToken).ConfigureAwait(false); _logger?.WroteChunks(document.Identifier); return document; } }