Persisting Projections¶
The in-memory projection store is perfect for getting started, but real read models need to survive restarts and scale beyond one process. You get that by implementing IProjectionDataStore<TProjection, TId> against a real database, and by giving the observer a durable checkpoint store to match.
The Contract¶
EventSourcingKit's projector writes through IProjectionDataStore. Your repository implements that contract, and usually a read interface as well, so the same class serves both the projector (writes) and your application (queries):
public class ProjectionRepository<TProjection, TId>(MyDbContext dbContext)
: IProjectionDataStore<TProjection, TId>,
IProjectionRepository<TProjection, TId>
where TProjection : Projection<TId>, new()
{
public async Task<TProjection?> GetByIdIfExisting(
TId id,
CancellationToken cancellationToken
) =>
await dbContext.Set<TProjection>().AsNoTracking()
.FirstOrDefaultAsync(p => p.Id!.Equals(id), cancellationToken);
public async Task Create(
TProjection projection,
CancellationToken cancellationToken
)
{
dbContext.Set<TProjection>().Add(projection);
await dbContext.SaveChangesAsync(cancellationToken);
}
// Update and DeleteById complete IProjectionDataStore;
// GetById and GetAll serve your read interface.
}
Here IProjectionRepository is the read interface you query through; it is the fourth type argument to AddProjection. You can use the built-in IProjectionRepository<TProjection, TId> or define your own.
PostgreSQL with Entity Framework Core¶
With EF Core, the repository works against a DbContext and dbContext.Set<TProjection>(). Register the context, then the projection, then a durable checkpoint store so the observer resumes where it left off:
builder.Services.AddDbContext<MyDbContext>(options =>
options.UseNpgsql(connectionString));
builder.Services.AddEventSourcingKit(builder.Configuration, [assembly])
.AddCheckpointStore<PostgresCheckpointStore>()
.AddProjection<
Book,
Guid,
ProjectionRepository<Book, Guid>,
IProjectionRepository<Book, Guid>
>();
The checkpoint store is a small class that implements ICheckpointStore over the same database, so projecting progress and read models stay consistent across restarts.
MongoDB¶
With MongoDB, the repository works against an IMongoCollection<TProjection>. An upsert keeps the write path simple, since the observer may revisit a projection while catching up:
public async Task Update(
TProjection projection,
CancellationToken cancellationToken
)
{
var filter = Builders<TProjection>.Filter.Eq(p => p.Id, projection.Id);
var options = new FindOneAndReplaceOptions<TProjection> { IsUpsert = true };
await _collection.FindOneAndReplaceAsync(
filter,
projection,
options,
cancellationToken
);
}
Registration mirrors the PostgreSQL case: register your IMongoDatabase, then call AddProjection with your Mongo-backed repository.
For More Information¶
- Projections and Projectors explains the data-store contract.
- Observing Events and Checkpoints explains why a durable checkpoint store matters.