Paillave.EtlNet.Core 2.0.47

There is a newer prerelease version of this package available.
See the version list below for details.
dotnet add package Paillave.EtlNet.Core --version 2.0.47
NuGet\Install-Package Paillave.EtlNet.Core -Version 2.0.47
This command is intended to be used within the Package Manager Console in Visual Studio, as it uses the NuGet module's version of Install-Package.
<PackageReference Include="Paillave.EtlNet.Core" Version="2.0.47" />
For projects that support PackageReference, copy this XML node into the project file to reference the package.
paket add Paillave.EtlNet.Core --version 2.0.47
#r "nuget: Paillave.EtlNet.Core, 2.0.47"
#r directive can be used in F# Interactive and Polyglot Notebooks. Copy this into the interactive tool or source code of the script to reference the package.
// Install Paillave.EtlNet.Core as a Cake Addin
#addin nuget:?package=Paillave.EtlNet.Core&version=2.0.47

// Install Paillave.EtlNet.Core as a Cake Tool
#tool nuget:?package=Paillave.EtlNet.Core&version=2.0.47

Etl.Net

ETL.NET Go to full documentation

Presentation

Implementation of a mass processing engine to use in a similar way than Linq with every SSIS features and much more. The reactive approach for the implementation of this engine ensures parallelized multi streams, high performances and low memory foot print even with million rows to process.

ETL.NET is fully written in .NET for a multi platform usage and for a straight forward integration in any application.

Extend it takes 5mn... literally.

Package nuget version nuget downloads
Paillave.EtlNet.Core NuGet NuGet
Paillave.EtlNet.Autofac NuGet NuGet
Paillave.EtlNet.Dropbox NuGet NuGet
Paillave.EtlNet.EntityFrameworkCore NuGet NuGet
Paillave.EtlNet.ExcelFile NuGet NuGet
Paillave.EtlNet.ExecutionToolkit NuGet NuGet
Paillave.EtlNet.FileSystem NuGet NuGet
Paillave.EtlNet.FromConfigurationConnectors NuGet NuGet
Paillave.EtlNet.Ftp NuGet NuGet
Paillave.EtlNet.Mail NuGet NuGet
Paillave.EtlNet.Sftp NuGet NuGet
Paillave.EtlNet.SqlServer NuGet NuGet
Paillave.EtlNet.TextFile NuGet NuGet
Paillave.EtlNet.Bloomberg NuGet NuGet
Paillave.EtlNet.XmlFile NuGet NuGet
Paillave.EtlNet.Zip NuGet NuGet
Paillave.EtlNet.Pdf NuGet NuGet
Paillave.EtlNet.Scheduler NuGet NuGet
Paillave.EntityFrameworkCoreExtension NuGet NuGet

Examples

Unzip it, read it, save it, report it

Read all zip files from a folder, unzip csv files that are inside, parse them, exclude duplicates, upsert them into database, and report new or pre existing id corresponding to the email.

dotnet new console -o SimpleTutorial
cd SimpleTutorial
dotnet add package Paillave.EtlNet.Core
dotnet add package Paillave.EtlNet.FileSystem
dotnet add package Paillave.EtlNet.Zip
dotnet add package Paillave.EtlNet.TextFile
dotnet add package Paillave.EtlNet.SqlServer
using System;
using System.Threading.Tasks;
using Paillave.Etl.Core;
using Paillave.Etl.FileSystem;
using Paillave.Etl.Zip;
using Paillave.Etl.TextFile;
using Paillave.Etl.SqlServer;
using System.Data.SqlClient;
using System.Linq;

namespace SimpleTutorial
{
    class Program
    {
        static async Task Main(string[] args)
        {
            var processRunner = StreamProcessRunner.Create<string>(DefineProcess);
            processRunner.DebugNodeStream += (sender, e) => { /* place a conditional breakpoint here for debug */ };
            using (var cnx = new SqlConnection(args[1]))
            {
                cnx.Open();
                var executionOptions = new ExecutionOptions<string>
                {
                    Resolver = new SimpleDependencyResolver().Register(cnx),
                };
                var res = await processRunner.ExecuteAsync(args[0], executionOptions);
                Console.Write(res.Failed ? "Failed" : "Succeeded");
                if (res.Failed)
                    Console.Write($"{res.ErrorTraceEvent.NodeName}({res.ErrorTraceEvent.NodeTypeName}):{res.ErrorTraceEvent.Content.Message}");
            }
        }
        private static void DefineProcess(ISingleStream<string> contextStream)
        {
            contextStream
                .CrossApplyFolderFiles("list all required files", "*.zip", true)
                .CrossApplyZipFiles("extract files from zip", "*.csv")
                .CrossApplyTextFile("parse file", FlatFileDefinition.Create(i => new Person
                {
                    Email = i.ToColumn("email"),
                    FirstName = i.ToColumn("first name"),
                    LastName = i.ToColumn("last name"),
                    DateOfBirth = i.ToDateColumn("date of birth", "yyyy-MM-dd"),
                    Reputation = i.ToNumberColumn<int?>("reputation", ".")
                }).IsColumnSeparated(','))
                .Distinct("exclude duplicates based on the Email", i => i.Email)
                .SqlServerSave("upsert using Email as key and ignore the Id", o => o
                    .ToTable("dbo.Person")
                    .SeekOn(p => p.Email)
                    .DoNotSave(p => p.Id))
                .Select("define row to report", i => new { i.Email, i.Id })
                .ToTextFileValue("write summary to file", "report.csv", FlatFileDefinition.Create(i => new
                {
                    Email = i.ToColumn("Email"),
                    Id = i.ToNumberColumn<int>("new or existing Id", ".")
                }).IsColumnSeparated(','))
                .WriteToFile("save log file", i => i.Name);
        }
        private class Person
        {
            public int Id { get; set; }
            public string Email { get; set; }
            public string FirstName { get; set; }
            public string LastName { get; set; }
            public DateTime DateOfBirth { get; set; }
            public int? Reputation { get; set; }
        }
    }
}

Run it, debug it, track it, log it

Execute an ETL process, debug it by tracking debug events using the IDE debugger, catch execution events and log it into database.

using System;
using System.Threading.Tasks;
using Paillave.Etl.Core;
using Paillave.Etl.FileSystem;
using Paillave.Etl.Zip;
using Paillave.Etl.TextFile;
using Paillave.Etl.SqlServer;
using System.Data.SqlClient;

namespace SimpleTutorial
{
  class Program
  {
    static async Task Main(string[] args)
    {
      var processRunner = StreamProcessRunner.Create<string>(DefineProcess);
      processRunner.DebugNodeStream += (sender, e) => { /* PLACE A CONDITIONAL BREAKPOINT HERE FOR DEBUG */ };
      using (var cnx = new SqlConnection(args[1]))
      {
        cnx.Open();
        var executionOptions = new ExecutionOptions<string>
        {
          Resolver = new SimpleDependencyResolver().Register(cnx),
          TraceProcessDefinition = DefineTraceProcess,
          // UseDetailedTraces = true // activate only if per row traces are meant to be caught
        };
        var res = await processRunner.ExecuteAsync(args[0], executionOptions);
        Console.Write(res.Failed ? "Failed" : "Succeeded");
        if (res.Failed)
          Console.Write($"{res.ErrorTraceEvent.NodeName}({res.ErrorTraceEvent.NodeTypeName}):{res.ErrorTraceEvent.Content.Message}");
      }
    }
    private static void DefineProcess(ISingleStream<string> contextStream)
    {
      // TODO: define your ELT process here
    }
    private static void DefineTraceProcess(IStream<TraceEvent> traceStream, ISingleStream<string> contentStream)
    {
      traceStream
        .Where("keep only summary of node and errors", i => i.Content is CounterSummaryStreamTraceContent || i.Content is UnhandledExceptionStreamTraceContent)
        .Select("create log entry", i => new ExecutionLog
          {
            DateTime = i.DateTime,
            ExecutionId = i.ExecutionId,
            EventType = i.Content switch
            {
              CounterSummaryStreamTraceContent => "EndOfNode",
              UnhandledExceptionStreamTraceContent => "Error",
              _ => "Unknown"
            },
            Message = i.Content switch
            {
              CounterSummaryStreamTraceContent counterSummary => $"{i.NodeName}: {counterSummary.Counter}",
              UnhandledExceptionStreamTraceContent unhandledException => $"{i.NodeName}({i.NodeTypeName}): [{unhandledException.Level.ToString()}] {unhandledException.Message}",
              _ => "Unknown"
            }
          })
        .SqlServerSave("save traces", o => o.ToTable("dbo.ExecutionTrace"));
    }
    private class ExecutionLog
    {
      public DateTime DateTime { get; set; }
      public Guid ExecutionId { get; set; }
      public string EventType { get; set; }
      public string Message { get; set; }
    }
  }
}

Normalize it

Dispatch rows from a flat file into several tables to normalize data thanks to the correlation mechanism.

private static void DefineProcess(ISingleStream<string> contextStream)
{
  var rowStream = contextStream
    .CrossApplyFolderFiles("list all required files", "*.csv", true)
    .CrossApplyTextFile("parse file", FlatFileDefinition.Create(i => new
    {
      Author = i.ToColumn("author"),
      Email = i.ToColumn("email"),
      TimeSpan = i.ToDateColumn("timestamp", "yyyyMMddHHmmss"),
      Category = i.ToColumn("category"),
      Link = i.ToColumn("link"),
      Post = i.ToColumn("post"),
      Title = i.ToColumn("title"),
    }).IsColumnSeparated(','))
    .SetForCorrelation("set correlation for row");

  var authorStream = rowStream
    .Distinct("remove author duplicates based on emails", i => i.Email)
    .Select("create author instance", i => new Author { Email = i.Email, Name = i.Author })
    .EfCoreSaveCorrelated("save or update authors", o => o
      .SeekOn(i => i.Email)
      .AlternativelySeekOn(i => i.Name));

  var categoryStream = rowStream
    .Distinct("remove category duplicates", i => i.Category)
    .Select("create category instance", i => new Category { Code = i.Category, Name = i.Category })
    .EfCoreSaveCorrelated("insert categories if doesn't exist, get it otherwise", o => o
      .SeekOn(i => i.Code)
      .DoNotUpdateIfExists());

  var postStream = rowStream
    .CorrelateToSingle("get related category", categoryStream, (l, r) => new { Row = l, Category = r })
    .CorrelateToSingle("get related author", authorStream, (l, r) => new { l.Row, l.Category, Author = r })
    .Select("create post instance", i => string.IsNullOrWhiteSpace(i.Row.Post)
      ? new LinkPost
      {
        AuthorId = i.Author.Id,
        CategoryId = i.Category.Id,
        DateTime = i.Row.TimeSpan,
        Title = i.Row.Title,
        Url = new Uri(i.Row.Link)
      } as Post
      : new TextPost
      {
        AuthorId = i.Author.Id,
        CategoryId = i.Category.Id,
        DateTime = i.Row.TimeSpan,
        Title = i.Row.Title,
        Text = i.Row.Post
      })
    .EfCoreSaveCorrelated("save or update posts", o => o
      .SeekOn(i => new { i.AuthorId, i.DateTime }));
}
Product Compatible and additional computed target framework versions.
.NET net6.0 is compatible.  net6.0-android was computed.  net6.0-ios was computed.  net6.0-maccatalyst was computed.  net6.0-macos was computed.  net6.0-tvos was computed.  net6.0-windows was computed.  net7.0 was computed.  net7.0-android was computed.  net7.0-ios was computed.  net7.0-maccatalyst was computed.  net7.0-macos was computed.  net7.0-tvos was computed.  net7.0-windows was computed.  net8.0 was computed.  net8.0-android was computed.  net8.0-browser was computed.  net8.0-ios was computed.  net8.0-maccatalyst was computed.  net8.0-macos was computed.  net8.0-tvos was computed.  net8.0-windows was computed. 
Compatible target framework(s)
Included target framework(s) (in package)
Learn more about Target Frameworks and .NET Standard.
  • net6.0

    • No dependencies.

NuGet packages (18)

Showing the top 5 NuGet packages that depend on Paillave.EtlNet.Core:

Package Downloads
Paillave.EtlNet.TextFile

Extensions for Etl.Net to read and write csv and fixed column width files

Paillave.EtlNet.EntityFrameworkCore

EntityFrameworkCore extensions for Etl.Net

Paillave.EtlNet.FileSystem

File system extensions for Etl.Net

Paillave.EtlNet.ExcelFile

Extensions for Etl.Net to deal with excel files

Paillave.EtlNet.Zip

Extensions for Etl.Net to extract zip

GitHub repositories

This package is not used by any popular GitHub repositories.

Version Downloads Last updated
2.1.9-beta 3,973 6/29/2023
2.1.8-beta 1,889 6/28/2023
2.1.7-beta 1,843 6/28/2023
2.1.6-beta 2,589 5/19/2023
2.1.3-beta 2,106 3/21/2023
2.1.2-beta 2,390 1/29/2023
2.1.0-beta 2,142 1/29/2023
2.0.52-beta 2,154 1/20/2023
2.0.47 23,879 8/19/2022
2.0.46 3,748 8/5/2022
2.0.23 8,474 3/8/2022
2.0.22 3,340 2/27/2022
2.0.21 3,276 2/27/2022
2.0.16 3,357 2/2/2022
2.0.4 3,276 9/15/2021