Skip to content

Persisting Projections

The in-memory projection store is perfect for getting started, but real read models need to survive restarts and scale beyond one process. You get that by implementing IProjectionDataStore<TProjection, TId> against a real database, and by giving the observer a durable checkpoint store to match.

The Contract

EventSourcingKit's projector writes through IProjectionDataStore. Your repository implements that contract, and usually a read interface as well, so the same class serves both the projector (writes) and your application (queries):

public class ProjectionRepository<TProjection, TId>(MyDbContext dbContext)
    : IProjectionDataStore<TProjection, TId>,
        IProjectionRepository<TProjection, TId>
    where TProjection : Projection<TId>, new()
{
    public async Task<TProjection?> GetByIdIfExisting(
        TId id,
        CancellationToken cancellationToken
    ) =>
        await dbContext.Set<TProjection>().AsNoTracking()
            .FirstOrDefaultAsync(p => p.Id!.Equals(id), cancellationToken);

    public async Task Create(
        TProjection projection,
        CancellationToken cancellationToken
    )
    {
        dbContext.Set<TProjection>().Add(projection);
        await dbContext.SaveChangesAsync(cancellationToken);
    }

    // Update and DeleteById complete IProjectionDataStore;
    // GetById and GetAll serve your read interface.
}

Here IProjectionRepository is the read interface you query through; it is the fourth type argument to AddProjection. You can use the built-in IProjectionRepository<TProjection, TId> or define your own.

PostgreSQL with Entity Framework Core

With EF Core, the repository works against a DbContext and dbContext.Set<TProjection>(). Register the context, then the projection, then a durable checkpoint store so the observer resumes where it left off:

builder.Services.AddDbContext<MyDbContext>(options =>
    options.UseNpgsql(connectionString));

builder.Services.AddEventSourcingKit(builder.Configuration, [assembly])
    .AddCheckpointStore<PostgresCheckpointStore>()
    .AddProjection<
        Book,
        Guid,
        ProjectionRepository<Book, Guid>,
        IProjectionRepository<Book, Guid>
    >();

The checkpoint store is a small class that implements ICheckpointStore over the same database, so projecting progress and read models stay consistent across restarts.

MongoDB

With MongoDB, the repository works against an IMongoCollection<TProjection>. An upsert keeps the write path simple, since the observer may revisit a projection while catching up:

public async Task Update(
    TProjection projection,
    CancellationToken cancellationToken
)
{
    var filter = Builders<TProjection>.Filter.Eq(p => p.Id, projection.Id);
    var options = new FindOneAndReplaceOptions<TProjection> { IsUpsert = true };

    await _collection.FindOneAndReplaceAsync(
        filter,
        projection,
        options,
        cancellationToken
    );
}

Registration mirrors the PostgreSQL case: register your IMongoDatabase, then call AddProjection with your Mongo-backed repository.

For More Information