还是event sourcing唯一性验证问题
来源:8-8 聚合命令事件(上)

慕粉1750262393
2018-08-10
老师,您好,之前提过这个问题。我按照你给的意见做了测试,关键代码如下:
@Configuration public class CustomerConfig { @Autowired EventStore eventStore; @Bean public AggregateFactory<Customer> customerFactory(){ SpringPrototypeAggregateFactory<Customer> aggregateFactory = new SpringPrototypeAggregateFactory<>(); aggregateFactory.setPrototypeBeanName("customer"); return aggregateFactory; } @Bean public Repository<Customer> customerRepository(){ return new EventSourcingRepository<>( customerFactory(), eventStore ); } }
@Component public class CustomerCommandHandler { @Autowired Repository<Customer> customerRepository; @Autowired CustomerEntityRepository customerEntityRepository; @CommandHandler public void handle(CustomerCreateCommand command) throws Exception { int count = customerEntityRepository.countByUsername(command.getName()); System.out.println("username count:" + count); Thread.sleep(10 * 1000); System.out.println("waiting......"); if (count > 0) { throw new RuntimeException("用户名已存在"); } else { customerRepository.newInstance(() -> new Customer(command)); } } }
经过测试,在并发请求时,不能有效保证username的唯一性。按理说,在axon-spring-boot-starter默认配置下,配置的是SimpleCommandBus,理论上来说对于同一个聚合上的command和其引发的saga以及event应该是线性执行的,但是,实测结果却与所期待的不同,推测是因为创建聚合这样的命令实际上在处理时,还没有聚合创建出来,所以无法保证command的顺序执行吗?
因为我之前对您的代码做过另外一个测试,在下面代码中我对CustomerDepositedEvent 的响应代码做了并发测试,也就是我同时发起两个请求来减少同一个账号的余额,测试显示。对于同一个聚合customer上的两次并发余额操作,axon在单体应用下可以保证对两次事件的顺序响应,不会出现错误。所以当您提示我的时候,我觉得在command的处理中应该也是这样,单结果却不是。不知道是我的代码有问题,还是axon就是这样处理的。另外,像下面这段代码,在分布式环境下如果query端部署了 多个节点,那多个节点之间会有并发冲突吗?比如:command端出发两次减余额操作,发出两个CustomerDepositedEvent 事件,query节点1响应一个,query节点2响应一个,那这两个节点用您例子中给出的这种更新视图的方式,会不会出现并发错误呢?这个我还没有测试过,不知道您有没有实测过。
@Service public class CustomerProjector { @Autowired private CustomerEntityRepository repository; @EventHandler public void on(CustomerCreatedEvent event) { CustomerEntity customer = new CustomerEntity(event.getCustomerId(), event.getName(), event.getPassword(), 0d); repository.save(customer); } @EventHandler public void on(CustomerDepositedEvent event) { String customerId = event.getCustomerId(); CustomerEntity accountView = repository.getOne(customerId); try { System.out.println(accountView.getDeposit()); System.out.println("waiting......"); Thread.sleep(30*1000); } catch (InterruptedException e) { e.printStackTrace(); } Double newDeposit = accountView.getDeposit() + event.getAmount(); accountView.setDeposit(newDeposit); repository.save(accountView); } @EventHandler public void on(CustomerChargedEvent event) { String customerId = event.getCustomerId(); CustomerEntity customer = repository.getOne(customerId); Double newDeposit = customer.getDeposit() - event.getAmount(); customer.setDeposit(newDeposit); repository.save(customer); } @EventHandler public void on(OrderPaidEvent event) { String customerId = event.getCustomerId(); CustomerEntity customer = repository.getOne(customerId); Double newDeposit = customer.getDeposit() - event.getAmount(); customer.setDeposit(newDeposit); repository.save(customer); } }
最后,您视频提到axon的文档和社区很活跃,可是我看官方文档很简单啊http://docs.axonframework.org/v/3.2/
还有社区地址又是多少呢?
最后,如果我所说的都成立,也就是说,我像上面这样写代码,不能保证并发冲突,或者说不能在分布式环境下保证并发冲突,还是需要再在countByUsername这样的地方加锁,减余额还是需要利用sql update set deposit=deposit-amount where id=?这样的形式,那我们使用axon或者事件来保证分布式一致性的意义又体现在哪里呢?
2回答
-
再回答第二个问题:CustomerProjector并发处理event的问题。
你说的这种并发问题还真的存在,如果是一个query服务,我们可以通过读消息处理消息的并发数来控制。但是,在有多个query服务的时候,他们都从队列上读取消息并处理,应该是会有问题的。那这种情况下,还是要通过合理的方法设计来避免。
比如在更新余额的时候,不要get - set,而是 update,例如使用spring data的repository上的自定义query,写一个update的query。
当然,我们也可以通过数据库的事务隔离机制来避免,比如这种情况下就使用“序列化”的隔离方式。
最后,axon的社区是在google groups上,然后,如果有bug,在github上的确认和更新也是很快的。
082018-08-11 -
大漠风
2018-08-10
非常高质量的问题,回答之前先手动点个赞。
先说第一个问题,command并发处理的问题。在使用聚合对象上的CommandHandler处理方法时,我们的command上面关联id,也就是聚合对象Id,axon框架在处理该command的时候,根据Id找到(或创建)聚合对象,并对同样Id的command依次处理。这在我们的实例中也演示过,也就是在并发测试(性能测试)的时候。
然后,在默认情况下,axon处理command,再处理event,是同步调用执行,也就是说,command handler处理方法在执行的时候,最终会调用event处理方法,在event处理完以后,这个command的处理方法才执行完成。
所以,在你测试event处理的并发性,也就是:“因为我之前对您的代码做过另外一个测试,在下面代码中我对CustomerDepositedEvent 的响应代码做了并发测试,也就是我同时发起两个请求来减少同一个账号的余额,测试显示。对于同一个聚合customer上的两次并发余额操作,axon在单体应用下可以保证对两次事件的顺序响应,不会出现错误。”
这时候,你的2个command依次执行,只有在第一个执行完以后,第二个才会执行。所以并非不会有问题。
然后,再说使用CustomerCommandHandler类的并发问题,就不一样了。这个处理类,在处理一个command的时候,并没有关联聚合对象,而是我们自己在代码里从 聚合资源库中取出聚合对象,再验证、处理、再触发相应的event。所以这时候的并发就不一样了,就不能在开始处理的时候就依次处理。
我反复想了一下,在你使用的测试情况下,即使是在单服务中,也无法简单的保证并发时不出错,因为判断总会有先后,如果在用户名字段上有唯一索引,后执行的方法总会报错。我们只要能保证在出错时数据的一致性,对于这种小概率事件,还是应该允许。如果为了这个弄得过于复杂就得不偿失了。
但是呢,借助于axon的这种机制:“同一个聚合对象的处理方法依次处理同一个聚合对象上的事件”,我们可以创建一个UserNameAggregate,对他来说,username就是聚合id,当创建用户时,由UserNameAggregate上的方法处理UserNameCreateCommnad,这样同样的username始终依次处理;然后生成UserNameCreatedEvent,它再产生一个UserCreateCommnad,去出发原先的用户创建流程。这有点像saga流程一样,但是又没那么复杂,因为就2步,而且第一步只是为了验证。
022018-08-11
相似问题