本地事件总线和事务
  9JriZXzjEcmS 2024年01月05日 19 0

本地事件总线和事务

通过重写Ef CoreSaveChanges/SaveChangesAsync来实现事务。当然,如果您愿意实现仓储层,可以在仓储层实现展开对应实体包含的事件,并且调整事件的处理顺序。

Github仓库地址:soda-event-bus

实现AggregateRoot

AggregateRoot类主要通过一个集合来记录本次事务的所有事件,到保存前再展开读取,在Abp中采用的ICollection记录的本地事件,通过实现一个排序器来保证顺序问题,我这里直接采用了ConcurrentQueue,保证原子操作的同时保证了顺序性,实现更简单一些。

public abstract class AggregateRoot
{
    public ConcurrentQueue<object> LocalEvents { get; } = new();

    public void AddLocalEvent<TEvent>(TEvent eventData) where TEvent : IEvent
    {
        LocalEvents.Enqueue(eventData);
    }

    public bool GetLocalEvent(out object? @event)
    {
        LocalEvents.TryDequeue(out var eventData);

        @event = eventData;
        return @event is not null;
    }

    public void ClearLocalEvents()
    {
        LocalEvents.Clear();
    }
}

重写DbContext

主要是从ServiceProvider中获取对应实体类包含的事件,并且找到对应的Handler进行处理,然后再当作一个事务提交。

public class EventBusDbContext<TDbContext> : DbContext
    where TDbContext : DbContext
{
    private readonly IServiceProvider _serviceProvider;

    public EventBusDbContext(DbContextOptions<TDbContext> options, IServiceProvider serviceProvider) : base(options)
    {
        _serviceProvider = serviceProvider;
    }

    public override int SaveChanges()
    {
        return base.SaveChanges();
    }

    public override int SaveChanges(bool acceptAllChangesOnSuccess)
    {
        return base.SaveChanges(acceptAllChangesOnSuccess);
    }

    public override async Task<int> SaveChangesAsync(CancellationToken cancellationToken = default)
    {
        await HandleEventsAsync();

        return await base.SaveChangesAsync(cancellationToken);
    }

    public override async Task<int> SaveChangesAsync(bool acceptAllChangesOnSuccess, CancellationToken cancellationToken = default)
    {
        await HandleEventsAsync();

        return await base.SaveChangesAsync(acceptAllChangesOnSuccess, cancellationToken);
    }

    private async Task HandleEventsAsync()
    {
        foreach (var entityEntry in ChangeTracker.Entries<AggregateRoot>())
        {
            while (entityEntry.Entity.GetLocalEvent(out var @event))
            {
                if (@event is null) break;

                await HandleEventAsync(@event);
            }

            entityEntry.Entity.ClearLocalEvents();
        }
    }

    private async Task HandleEventAsync(object @event)
    {
        var eventHandlerType = typeof(IAsyncEventHandler<>).MakeGenericType(@event.GetType());
        var eventHandler = _serviceProvider.GetRequiredService(eventHandlerType);

        var method = eventHandler.GetType().GetMethod(nameof(IAsyncEventHandler<IEvent>.HandleAsync));
        var exceptionHandleMethod = eventHandlerType.GetMethod(nameof(IAsyncEventHandler<IEvent>.HandleException));

        try
        {
            await (Task)method!.Invoke(eventHandler, new[] { @event })!;
        }
        catch (Exception ex)
        {
            exceptionHandleMethod!.Invoke(eventHandler, new[] { @event, ex });
        }
    }
}

分布式事件总线和事务

根据需要扩展即可,基本逻辑相同,但可能需要增加确认机制等。

【版权声明】本文内容来自摩杜云社区用户原创、第三方投稿、转载,内容版权归原作者所有。本网站的目的在于传递更多信息,不拥有版权,亦不承担相应法律责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@moduyun.com

  1. 分享:
最后一次编辑于 2024年01月05日 0

暂无评论

推荐阅读
  NPQODODLqddb   2024年05月17日   69   0   0 .NET
  mVIyUuLhKsxa   2024年05月17日   52   0   0 .NET
  XkHDHG7Y62UM   2024年05月17日   45   0   0 .NET
  f18CFixvrKz8   2024年05月18日   85   0   0 .NET
  rBgzkhl6abbw   2024年05月18日   77   0   0 .NET
  MYrYhn3ObP4r   2024年05月17日   41   0   0 .NET
  S34pIcuyyIVd   2024年05月17日   60   0   0 .NET
  gKJ2xtp6I8Y7   2024年05月17日   50   0   0 .NET
  MYrYhn3ObP4r   2024年05月17日   39   0   0 .NET
9JriZXzjEcmS