还是event sourcing唯一性验证问题

来源:8-8 聚合命令事件(上)

慕粉1750262393

2018-08-10

老师,您好,之前提过这个问题。我按照你给的意见做了测试,关键代码如下:

@Configuration
public class CustomerConfig {

    @Autowired
    EventStore eventStore;

    @Bean
    public AggregateFactory<Customer> customerFactory(){
        SpringPrototypeAggregateFactory<Customer> aggregateFactory = new SpringPrototypeAggregateFactory<>();
        aggregateFactory.setPrototypeBeanName("customer");
        return aggregateFactory;
    }

    @Bean
    public Repository<Customer> customerRepository(){
        return new EventSourcingRepository<>(
                customerFactory(),
                eventStore
        );
    }
}
@Component
public class CustomerCommandHandler {

    @Autowired
    Repository<Customer> customerRepository;

    @Autowired
    CustomerEntityRepository customerEntityRepository;

    @CommandHandler
    public void handle(CustomerCreateCommand command) throws Exception {
        int count = customerEntityRepository.countByUsername(command.getName());
        System.out.println("username count:" + count);
        Thread.sleep(10 * 1000);
        System.out.println("waiting......");
        if (count > 0) {
            throw new RuntimeException("用户名已存在");
        } else {
            customerRepository.newInstance(() -> new Customer(command));
        }
    }
}

经过测试,在并发请求时,不能有效保证username的唯一性。按理说,在axon-spring-boot-starter默认配置下,配置的是SimpleCommandBus,理论上来说对于同一个聚合上的command和其引发的saga以及event应该是线性执行的,但是,实测结果却与所期待的不同,推测是因为创建聚合这样的命令实际上在处理时,还没有聚合创建出来,所以无法保证command的顺序执行吗?

因为我之前对您的代码做过另外一个测试,在下面代码中我对CustomerDepositedEvent 的响应代码做了并发测试,也就是我同时发起两个请求来减少同一个账号的余额,测试显示。对于同一个聚合customer上的两次并发余额操作,axon在单体应用下可以保证对两次事件的顺序响应,不会出现错误。所以当您提示我的时候,我觉得在command的处理中应该也是这样,单结果却不是。不知道是我的代码有问题,还是axon就是这样处理的。另外,像下面这段代码,在分布式环境下如果query端部署了 多个节点,那多个节点之间会有并发冲突吗?比如:command端出发两次减余额操作,发出两个CustomerDepositedEvent 事件,query节点1响应一个,query节点2响应一个,那这两个节点用您例子中给出的这种更新视图的方式,会不会出现并发错误呢?这个我还没有测试过,不知道您有没有实测过。

@Service
public class CustomerProjector {

    @Autowired
    private CustomerEntityRepository repository;

    @EventHandler
    public void on(CustomerCreatedEvent event) {
        CustomerEntity customer = new CustomerEntity(event.getCustomerId(), event.getName(), event.getPassword(), 0d);
        repository.save(customer);
    }

    @EventHandler
    public void on(CustomerDepositedEvent event) {
        String customerId = event.getCustomerId();
        CustomerEntity accountView = repository.getOne(customerId);
        try {
            System.out.println(accountView.getDeposit());
            System.out.println("waiting......");
            Thread.sleep(30*1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        Double newDeposit = accountView.getDeposit() + event.getAmount();
        accountView.setDeposit(newDeposit);
        repository.save(accountView);
    }

    @EventHandler
    public void on(CustomerChargedEvent event) {
        String customerId = event.getCustomerId();
        CustomerEntity customer = repository.getOne(customerId);

        Double newDeposit = customer.getDeposit() - event.getAmount();
        customer.setDeposit(newDeposit);
        repository.save(customer);
    }

    @EventHandler
    public void on(OrderPaidEvent event) {
        String customerId = event.getCustomerId();
        CustomerEntity customer = repository.getOne(customerId);

        Double newDeposit = customer.getDeposit() - event.getAmount();
        customer.setDeposit(newDeposit);
        repository.save(customer);
    }
}

最后,您视频提到axon的文档和社区很活跃,可是我看官方文档很简单啊http://docs.axonframework.org/v/3.2/

还有社区地址又是多少呢?

最后,如果我所说的都成立,也就是说,我像上面这样写代码,不能保证并发冲突,或者说不能在分布式环境下保证并发冲突,还是需要再在countByUsername这样的地方加锁,减余额还是需要利用sql   update set deposit=deposit-amount where id=?这样的形式,那我们使用axon或者事件来保证分布式一致性的意义又体现在哪里呢?


写回答

2回答

大漠风

2018-08-10

再回答第二个问题:CustomerProjector并发处理event的问题。

你说的这种并发问题还真的存在,如果是一个query服务,我们可以通过读消息处理消息的并发数来控制。但是,在有多个query服务的时候,他们都从队列上读取消息并处理,应该是会有问题的。那这种情况下,还是要通过合理的方法设计来避免。

比如在更新余额的时候,不要get - set,而是 update,例如使用spring data的repository上的自定义query,写一个update的query。

当然,我们也可以通过数据库的事务隔离机制来避免,比如这种情况下就使用“序列化”的隔离方式。


最后,axon的社区是在google groups上,然后,如果有bug,在github上的确认和更新也是很快的。


0
8
大漠风
回复
慕粉1750262393
jgroup也是一样,同一个聚合的命令发到同一个实例
2018-08-11
共8条回复

大漠风

2018-08-10

非常高质量的问题,回答之前先手动点个赞。

先说第一个问题,command并发处理的问题。在使用聚合对象上的CommandHandler处理方法时,我们的command上面关联id,也就是聚合对象Id,axon框架在处理该command的时候,根据Id找到(或创建)聚合对象,并对同样Id的command依次处理。这在我们的实例中也演示过,也就是在并发测试(性能测试)的时候。

然后,在默认情况下,axon处理command,再处理event,是同步调用执行,也就是说,command handler处理方法在执行的时候,最终会调用event处理方法,在event处理完以后,这个command的处理方法才执行完成。

所以,在你测试event处理的并发性,也就是:“因为我之前对您的代码做过另外一个测试,在下面代码中我对CustomerDepositedEvent 的响应代码做了并发测试,也就是我同时发起两个请求来减少同一个账号的余额,测试显示。对于同一个聚合customer上的两次并发余额操作,axon在单体应用下可以保证对两次事件的顺序响应,不会出现错误。”

这时候,你的2个command依次执行,只有在第一个执行完以后,第二个才会执行。所以并非不会有问题。

然后,再说使用CustomerCommandHandler类的并发问题,就不一样了。这个处理类,在处理一个command的时候,并没有关联聚合对象,而是我们自己在代码里从 聚合资源库中取出聚合对象,再验证、处理、再触发相应的event。所以这时候的并发就不一样了,就不能在开始处理的时候就依次处理。

我反复想了一下,在你使用的测试情况下,即使是在单服务中,也无法简单的保证并发时不出错,因为判断总会有先后,如果在用户名字段上有唯一索引,后执行的方法总会报错。我们只要能保证在出错时数据的一致性,对于这种小概率事件,还是应该允许。如果为了这个弄得过于复杂就得不偿失了。

但是呢,借助于axon的这种机制:“同一个聚合对象的处理方法依次处理同一个聚合对象上的事件”,我们可以创建一个UserNameAggregate,对他来说,username就是聚合id,当创建用户时,由UserNameAggregate上的方法处理UserNameCreateCommnad,这样同样的username始终依次处理;然后生成UserNameCreatedEvent,它再产生一个UserCreateCommnad,去出发原先的用户创建流程。这有点像saga流程一样,但是又没那么复杂,因为就2步,而且第一步只是为了验证。

0
2
大漠风
用userNameAggregate,在一个业务请求里面处理两个command,他们在一个事务里,即使第二个失败了,第一个事件也回滚了。所以不会有你说的情况。 反而是用saga的话,他会保存一个未完成的saga流程,第一步的事件也保存了。
2018-08-11
共2条回复

分布式事务实践,从原理到实例,解决数据一致性

掌握分布式事务实现技术,是架构师必备技能。

1149 学习 · 153 问题

查看课程