首先,我们先简单解释下,什么是“内存Join”。
相信大家对关系数据库的 join 语句肯定不陌生,其作用就是通过关联关系从多个表中查询数据,关联条件和数据聚合全部由 数据库服务完成。
图片
而 内存 Join,简单来说就是把原本数据库帮我们完成的数据聚合操作迁移到应用服务,在应用服务的内存中完成。
图片
数据库join非常简单,但随着系统的发展,内存join变得越来越重要,其核心驱动力有:
发现变化,封装变化,管理变化,是开发人员的必备技能。
本篇文章从查询订单这个业务场景为入口,针对数据的内存join进行多次抽象和封装,最终实现“内存Join声明化”。
首先,先看下最终的效果,从直观上感受下“抽象”带来的效率提升。
图片
通过抽象,可以达到如下效果:
神秘背后的本质便是“抽象”。让我们以订单查询为线索,层层递进,最终实现“能力声明化”。
能力声明化,是抽象的一种高级表现,无需编写代码,通过配置的方式为特定组件进行能力加强。
在正式开始之前,可以先了解下整体的推演流程:
图片
假设,我们是订单中心的一位研发伙伴,需要开发 “我的订单” 模块,其核心接口包括:
根据需求定义 OrderService 接口如下:
public interface OrderService { // 我的订单 List<OrderListVO> getByUserId(Long userId); // 订单详情 OrderDetailVO getDetailByOrderId(Long orderId);}// 为配合多种实现策略,使用抽象类进行统一public abstract class OrderListVO { public abstract OrderVO getOrder(); public abstract UserVO getUser(); public abstract AddressVO getAddress(); public abstract ProductVO getProduct();}// 为配合多种实现策略,使用抽象类进行统一public abstract class OrderDetailVO { public abstract OrderVO getOrder(); public abstract UserVO getUser(); public abstract AddressVO getAddress(); public abstract ProductVO getProduct(); public abstract List<PayInfoVO> getPayInfo();}
这么简单的需求,那不是信手拈来,很快就提供了一版
图片
代码具体如下:
@Servicepublic class OrderServiceCodingV1 implements OrderService { @Autowired private OrderRepository orderRepository; @Autowired private AddressRepository addressRepository; @Autowired private ProductRepository productRepository; @Autowired private UserRepository userRepository; @Autowired private PayInfoRepository payInfoRepository; @Override public List<OrderListVO> getByUserId(Long userId) { // 获取用户订单 List<Order> orders = this.orderRepository.getByUserId(userId); // 依次进行数据绑定 return orders.stream() .map(order -> convertToOrderListVO(order)) .collect(toList()); } private OrderListVOCodingV1 convertToOrderListVO(Order order) { OrderVO orderVO = OrderVO.apply(order); OrderListVOCodingV1 orderDetailVO = new OrderListVOCodingV1(orderVO); // 绑定地址信息 Address address = this.addressRepository.getById(order.getAddressId()); AddressVO addressVO = AddressVO.apply(address); orderDetailVO.setAddress(addressVO); // 绑定用户信息 User user = this.userRepository.getById(order.getUserId()); UserVO userVO = UserVO.apply(user); orderDetailVO.setUser(userVO); // 绑定商品信息 Product product = this.productRepository.getById(order.getProductId()); ProductVO productVO = ProductVO.apply(product); orderDetailVO.setProduct(productVO); return orderDetailVO; } @Override public OrderDetailVO getDetailByOrderId(Long orderId) { // 暂时忽略 Order order = this.orderRepository.getById(orderId); return convertToOrderDetailVO(order); } private OrderDetailVO convertToOrderDetailVO(Order order) { OrderDetailVOCodingV1 orderDetail = new OrderDetailVOCodingV1(OrderVO.apply(order)); // 获取地址并进行绑定 Address address = this.addressRepository.getById(order.getAddressId()); AddressVO addressVO = AddressVO.apply(address); orderDetail.setAddress(addressVO); // 获取用户并进行绑定 User user = this.userRepository.getById(order.getUserId()); UserVO userVO = UserVO.apply(user); orderDetail.setUser(userVO); // 获取商品并进行绑定 Product product = this.productRepository.getById(order.getProductId()); ProductVO productVO = ProductVO.apply(product); orderDetail.setProduct(productVO); // 获取支付信息并进行绑定 List<PayInfo> payInfos = this.payInfoRepository.getByOrderId(order.getId()); List<PayInfoVO> payInfoVOList = payInfos.stream() .map(PayInfoVO::apply) .collect(toList()); orderDetail.setPayInfo(payInfoVOList); return orderDetail; }}
如果真的这样实现,那你离“被跑路”不远了。
为什么会这么说呢?因为 ==“我的订单”这个接口存在严重的性能问题!==
“我的订单”接口具体实现如下:
单个用户请求,数据库访问总次数 = 1(获取用户订单)+ N(订单数量) * 3(需要抓取的关联数据)
其中,N(订单数量) * 3(关联数据数量) 存在性能隐患,存在严重的==读放大效应==。一旦遇到忠实用户,存在成百上千订单,除了超时别无办法。
“订单详情”接口实现,目前问题不大,最大的问题为:==“订单详情”与“我的订单”两个接口存在大量的重复逻辑!==
首先,我们先来解决 “我的订单”接口的性能问题。从之前的分析可知,性能低下的根本原因在于==“读放大效应”==,数据库请求次数与用户订单数成正比,为了更好的保障性能,最好将数据库操作控制在一个常量。
整体思路为:先批量获取要绑定的数据,然后遍历每一个订单,在内存中完成数据绑定。
图片
实现代码如下:
@Servicepublic class OrderServiceCodingV2 implements OrderService { @Autowired private OrderRepository orderRepository; @Autowired private AddressRepository addressRepository; @Autowired private ProductRepository productRepository; @Autowired private UserRepository userRepository; @Autowired private PayInfoRepository payInfoRepository; @Override public List<OrderListVO> getByUserId(Long userId) { List<Order> orders = this.orderRepository.getByUserId(userId); List<OrderListVOCodingV2> orderDetailVOS = orders.stream() .map(order -> new OrderListVOCodingV2(OrderVO.apply(order))) .collect(toList()); // 批量获取用户,并依次进行绑定 List<Long> userIds = orders.stream() .map(Order::getUserId) .collect(toList()); List<User> users = this.userRepository.getByIds(userIds); Map<Long, User> userMap = users.stream() .collect(toMap(User::getId, Function.identity(), (a, b) -> a)); for (OrderListVOCodingV2 orderDetailVO : orderDetailVOS){ User user = userMap.get(orderDetailVO.getOrder().getUserId()); UserVO userVO = UserVO.apply(user); orderDetailVO.setUser(userVO); } // 批量获取地址,并依次进行绑定 List<Long> addressIds = orders.stream() .map(Order::getAddressId) .collect(toList()); List<Address> addresses = this.addressRepository.getByIds(addressIds); Map<Long, Address> addressMap = addresses.stream() .collect(toMap(Address::getId, Function.identity(), (a, b) -> a)); for (OrderListVOCodingV2 orderDetailVO : orderDetailVOS){ Address address = addressMap.get(orderDetailVO.getOrder().getAddressId()); AddressVO addressVO = AddressVO.apply(address); orderDetailVO.setAddress(addressVO); } // 批量获取商品,并依次进行绑定 List<Long> productIds = orders.stream() .map(Order::getProductId) .collect(toList()); List<Product> products = this.productRepository.getByIds(productIds); Map<Long, Product> productMap = products.stream() .collect(toMap(Product::getId, Function.identity(), (a, b) -> a)); for (OrderListVOCodingV2 orderDetailVO : orderDetailVOS){ Product product = productMap.get(orderDetailVO.getOrder().getProductId()); ProductVO productVO = ProductVO.apply(product); orderDetailVO.setProduct(productVO); } return orderDetailVOS.stream() .collect(toList()); } @Override public OrderDetailVO getDetailByOrderId(Long orderId) { // 暂时忽略 Order order = this.orderRepository.getById(orderId); return convertToOrderDetailVO(order); } private OrderDetailVO convertToOrderDetailVO(Order order) { // 暂时忽略 return orderDetail; }}
调整之后,对于“我的订单”接口,单个用户请求==数据库的访问次数变成了常量(4)==。
如果你是这么实现的,那恭喜你,你已步入==合格程序员行列==。
批量查询+内存Join 方案能满足大部分场景,如果要抓取的数据太多,也就是数据库访问这个==常量变大==时,性能也会越来越差。
原因很简单,由于串行执行,==整体耗时 = 获取订单耗时 + sum(抓取数据耗时)==
聪明的同学早就跃跃欲试,这个我会:==多线程并行执行呗。==
是的,基于 Future 的实现如下(还有很多版本,比如 CountDownLatch)
整体设计如下:
图片
示例代码如下:
@Servicepublic class OrderServiceCodingV3 implements OrderService { private ExecutorService executorService; @Autowired private OrderRepository orderRepository; @Autowired private AddressRepository addressRepository; @Autowired private ProductRepository productRepository; @Autowired private UserRepository userRepository; @Autowired private PayInfoRepository payInfoRepository; @PostConstruct public void init(){ // 初始化线程池(不要使用Executors,这里只是演示,需要对资源进行评估) this.executorService = Executors.newFixedThreadPool(20); } @SneakyThrows @Override public List<OrderListVO> getByUserId(Long userId) { List<Order> orders = this.orderRepository.getByUserId(userId); List<OrderListVOCodingV2> orderDetailVOS = orders.stream() .map(order -> new OrderListVOCodingV2(OrderVO.apply(order))) .collect(toList()); List<Callable<Void>> callables = Lists.newArrayListWithCapacity(3); // 创建异步任务 callables.add(() -> { // 批量获取用户,并依次进行绑定 List<Long> userIds = orders.stream() .map(Order::getUserId) .collect(toList()); List<User> users = this.userRepository.getByIds(userIds); Map<Long, User> userMap = users.stream() .collect(toMap(User::getId, Function.identity(), (a, b) -> a)); for (OrderListVOCodingV2 orderDetailVO : orderDetailVOS){ User user = userMap.get(orderDetailVO.getOrder().getUserId()); UserVO userVO = UserVO.apply(user); orderDetailVO.setUser(userVO); } return null; }); // 创建异步任务 callables.add(() ->{ // 批量获取地址,并依次进行绑定 List<Long> addressIds = orders.stream() .map(Order::getAddressId) .collect(toList()); List<Address> addresses = this.addressRepository.getByIds(addressIds); Map<Long, Address> addressMap = addresses.stream() .collect(toMap(Address::getId, Function.identity(), (a, b) -> a)); for (OrderListVOCodingV2 orderDetailVO : orderDetailVOS){ Address address = addressMap.get(orderDetailVO.getOrder().getAddressId()); AddressVO addressVO = AddressVO.apply(address); orderDetailVO.setAddress(addressVO); } return null; }); // 创建异步任务 callables.add(() -> { // 批量获取商品,并依次进行绑定 List<Long> productIds = orders.stream() .map(Order::getProductId) .collect(toList()); List<Product> products = this.productRepository.getByIds(productIds); Map<Long, Product> productMap = products.stream() .collect(toMap(Product::getId, Function.identity(), (a, b) -> a)); for (OrderListVOCodingV2 orderDetailVO : orderDetailVOS){ Product product = productMap.get(orderDetailVO.getOrder().getProductId()); ProductVO productVO = ProductVO.apply(product); orderDetailVO.setProduct(productVO); } return null; }); // 执行异步任务 this.executorService.invokeAll(callables); return orderDetailVOS.stream() .collect(toList()); } @Override public OrderDetailVO getDetailByOrderId(Long orderId) { // 暂时忽略 Order order = this.orderRepository.getById(orderId); return convertToOrderDetailVO(order); } private OrderDetailVO convertToOrderDetailVO(Order order) { // 暂时忽略 }}
多线程并发执行,==整体耗时 = 获取订单耗时 + max(抓取数据耗时)==
如果你能够这样实现的,那恭喜你,你已步入==高级程序员行列==。
然后呢,到此为止了?NO,接下来才是高潮!!!
让我们==打开认知==,开启==“抽象+封装”==之旅。
仔细研究上述代码,寻找里面的==“变与不变”==,你会发现:
找到逻辑中的变化点,接下来便是有针对性的进行封装。
对于 “我的订单” 和 “订单详情” 返回==不同的 VO==,该怎么处理呢?
非常简单,思路如下:
整体设计如下:
图片
简单示例如下:
// 以 UserVO 为例,ProductVO、AddressVO,PayInfoVO 基本一致,不在赘述public interface UserVOFetcherV1 { Long getUserId(); void setUser(UserVO user);}// OrderDetailVO 实现对应的接口,为了突出重点暂时忽略具体实现public class OrderDetailVOFetcherV1 extends OrderDetailVO implements AddressVOFetcherV1, ProductVOFetcherV1, UserVOFetcherV1, PayInfoVOFetcherV1{}// OrderListVO 实现对应接口,为了突出重点暂时忽略具体实现public class OrderListVOFetcherV1 extends OrderListVO implements AddressVOFetcherV1, ProductVOFetcherV1, UserVOFetcherV1 {}
有了统一的操作接口,接下来便是抽取具体的绑定逻辑,以 UserVOFetcherExecutor 为例:
@Componentpublic class UserVOFetcherExecutorV1 { @Autowired private UserRepository userRepository; public void fetch(List<? extends UserVOFetcherV1> fetchers){ List<Long> ids = fetchers.stream() .map(UserVOFetcherV1::getUserId) .distinct() .collect(Collectors.toList()); List<User> users = userRepository.getByIds(ids); Map<Long, User> userMap = users.stream() .collect(toMap(user -> user.getId(), Function.identity())); fetchers.forEach(fetcher -> { Long userId = fetcher.getUserId(); User user = userMap.get(userId); if (user != null){ UserVO userVO = UserVO.apply(user); fetcher.setUser(userVO); } }); }}
实现逻辑没有变化,最重要的变化在于“入参类型”,不在是具体的 VO,而是抽象的 UserVOFetcher 接口。
AddressVOFetcherExecutor、ProductVOFetcherExecutor、PayInfoVOFetcherExecutor 与 UserVOFetcherExecutorV1 逻辑基本一致,篇幅问题不在赘述。
这样一个小小的调整,会给使用方带来什么便利?一起看下使用方的变化:
@Servicepublic class OrderServiceFetcherV1 implements OrderService { @Autowired private OrderRepository orderRepository; @Autowired private AddressVOFetcherExecutorV1 addressVOFetcherExecutorV1; @Autowired private ProductVOFetcherExecutorV1 productVOFetcherExecutorV1; @Autowired private UserVOFetcherExecutorV1 userVOFetcherExecutorV1; @Autowired private PayInfoVOFetcherExecutorV1 payInfoVOFetcherExecutorV1; @Override public List<OrderListVO> getByUserId(Long userId) { List<Order> orders = this.orderRepository.getByUserId(userId); List<OrderListVOFetcherV1> orderDetailVOS = orders.stream() .map(order -> new OrderListVOFetcherV1(OrderVO.apply(order))) .collect(toList()); // 直接使用 FetcherExecutor 完成数据绑定 this.addressVOFetcherExecutorV1.fetch(orderDetailVOS); this.productVOFetcherExecutorV1.fetch(orderDetailVOS); this.userVOFetcherExecutorV1.fetch(orderDetailVOS); return orderDetailVOS.stream() .collect(toList()); } @Override public OrderDetailVO getDetailByOrderId(Long orderId) { Order order = this.orderRepository.getById(orderId); OrderDetailVOFetcherV1 orderDetail = new OrderDetailVOFetcherV1(OrderVO.apply(order)); List<OrderDetailVOFetcherV1> orderDetailVOS = Arrays.asList(orderDetail); // 直接使用 FetcherExecutor 完成数据绑定 this.addressVOFetcherExecutorV1.fetch(orderDetailVOS); this.productVOFetcherExecutorV1.fetch(orderDetailVOS); this.userVOFetcherExecutorV1.fetch(orderDetailVOS); this.payInfoVOFetcherExecutorV1.fetch(orderDetailVOS); return orderDetail; }}
两个方法直接使用 FetcherExecutor 完成数据抓取和绑定,实现了==绑定逻辑的复用==。
如果再有 VO 需要进行数据绑定,只需:
至此,面对新业务基本上与“绑定逻辑”说再见了。
接下来让我们一起聚焦于绑定逻辑,先对比下上述的UserVOFetcherExecutor 与下面的 AddressVOFetcherExecutor, 找到里面的变化与不变:
@Componentpublic class AddressVOFetcherExecutorV1 { @Autowired private AddressRepository addressRepository; public void fetch(List<? extends AddressVOFetcherV1> fetchers){ // 获取关联信息 List<Long> ids = fetchers.stream() .map(AddressVOFetcherV1::getAddressId) .distinct() .collect(Collectors.toList()); // 查询关联数据 List<Address> addresses = addressRepository.getByIds(ids); // 转为为 Map Map<Long, Address> addressMap = addresses.stream() .collect(toMap(address -> address.getId(), Function.identity())); // 依次进行数据绑定 fetchers.forEach(fetcher -> { Long addressId = fetcher.getAddressId(); Address address = addressMap.get(addressId); if (address != null){ // 转换为 VO AddressVO addressVO = AddressVO.apply(address); // 将数据写回到结果 fetcher.setAddress(addressVO); } }); }}
仔细观察,会发现:
获取关联信息
查询关联数据
将其转换为 Map
讲数据转化为 VO
将 VO 绑定到结果对象
熟悉设计模式的伙伴是否眼前一亮?停顿一下好好回想一下,哪种模式就是用来处理这种问题的?
答案便是:模板方法模式
整体思想为:
整体设计如下:
图片
抽取公共父类如下:
abstract class BaseItemFetcherExecutor<FETCHER extends ItemFetcher, DATA, RESULT> implements ItemFetcherExecutor<FETCHER>{ @Override public void fetch(List<FETCHER> fetchers) { // 获取关联信息 List<Long> ids = fetchers.stream() .map(this::getFetchId) .distinct() .collect(Collectors.toList()); // 查询关联数据 List<DATA> datas = loadData(ids); // 转为为 Map Map<Long, List<DATA>> dataMap = datas.stream() .collect(groupingBy(this::getDataId)); // 依次进行数据绑定 fetchers.forEach(fetcher -> { Long id = getFetchId(fetcher); List<DATA> ds = dataMap.get(id); if (ds != null){ // 转换为 VO List<RESULT> result = ds.stream() .map( data -> convertToVo(data)) .collect(Collectors.toList()); // 将数据写回到结果 setResult(fetcher, result); } }); } protected abstract Long getFetchId(FETCHER fetcher); protected abstract List<DATA> loadData(List<Long> ids); protected abstract Long getDataId(DATA data); protected abstract RESULT convertToVo(DATA data); protected abstract void setResult(FETCHER fetcher, List<RESULT> result);}
基于 BaseItemFetcherExecutor 的 UserFetcherExecutor 如下:
@Componentpublic class UserVOFetcherExecutorV2 extends BaseItemFetcherExecutor<UserVOFetcherV2, User, UserVO>{ @Autowired private UserRepository userRepository; @Override protected Long getFetchId(UserVOFetcherV2 fetcher) { return fetcher.getUserId(); } @Override protected List<User> loadData(List<Long> ids) { return this.userRepository.getByIds(ids); } @Override protected Long getDataId(User user) { return user.getId(); } @Override protected UserVO convertToVo(User user) { return UserVO.apply(user); } @Override protected void setResult(UserVOFetcherV2 fetcher, List<UserVO> userVO) { if (CollectionUtils.isNotEmpty(userVO)) { fetcher.setUser(userVO.get(0)); } } @Override public boolean support(Class<UserVOFetcherV2> cls) { // 暂时忽略,稍后会细讲 return UserVOFetcherV2.class.isAssignableFrom(cls); }}
UserVOFetcherExecutor究竟发生什么变化呢?好像变得更复杂了:
那我们究竟得到了什么好处?可以花几分钟好好思考一下!!!
在说结果之前,让我们看下另一个变化点。回想下 FetcherExecutor 的执行点,如下:
@Override public List<OrderListVO> getByUserId(Long userId) { List<Order> orders = this.orderRepository.getByUserId(userId); List<OrderListVOFetcherV1> orderDetailVOS = orders.stream() .map(order -> new OrderListVOFetcherV1(OrderVO.apply(order))) .collect(toList()); // 手工调用,OrderListVO 实现新接口,需要增加新的依赖和调用 this.addressVOFetcherExecutorV1.fetch(orderDetailVOS); this.productVOFetcherExecutorV1.fetch(orderDetailVOS); this.userVOFetcherExecutorV1.fetch(orderDetailVOS); return orderDetailVOS.stream() .collect(toList()); } @Override public OrderDetailVO getDetailByOrderId(Long orderId) { Order order = this.orderRepository.getById(orderId); OrderDetailVOFetcherV1 orderDetail = new OrderDetailVOFetcherV1(OrderVO.apply(order)); List<OrderDetailVOFetcherV1> orderDetailVOS = Arrays.asList(orderDetail); // 手工调用,OrderDetailVO 实现新接口,需要增加新的依赖和调用 this.addressVOFetcherExecutorV1.fetch(orderDetailVOS); this.productVOFetcherExecutorV1.fetch(orderDetailVOS); this.userVOFetcherExecutorV1.fetch(orderDetailVOS); this.payInfoVOFetcherExecutorV1.fetch(orderDetailVOS); return orderDetail; }
其实,需要调用哪些 FetcherExecutor 完全可以由 VO 实现的接口来确定。也就是说,==需要绑定新数据,只需 VO 继承并实现新的 Fetcher 接口即可。==
对此,我们需要:
哪个设计模式是用来解决这个问题?花几分钟好好思考一下!
答案是:==责任链模型==
标准的责任链模式用起来比较繁琐,在 Spring 实现中大量使用他的一种变现,及提供一个验证接口,由组件自身完成判断,用于决定是否执行自身逻辑。
整体设计如下:
图片
首先,为了统一 FetcherExecutor 的行为,抽取通用接口:
public interface ItemFetcherExecutor<F extends ItemFetcher> { /** * 该组件是否能处理 cls 类型 * @param cls * @return */ boolean support(Class<F> cls); /** * 执行真正的数据绑定 * @param fetchers */ void fetch(List<F> fetchers);}
具体的实现,可以见 UserVOFetcherExecutorV2 的 support 方法:
@Overridepublic boolean support(Class<UserVOFetcherV2> cls) { return UserVOFetcherV2.class.isAssignableFrom(cls);}
实现逻辑非常简单,只是判断 cls 是否实现了 UserVOFetcherV2 接口。
有了 FetcherExecutor 组件后,接下来就是为其提供统一的访问入口:
@Servicepublic class FetcherService { @Autowired private List<ItemFetcherExecutor> itemFetcherExecutors; public <F extends ItemFetcher> void fetch(Class<F> cls, List<F> fetchers){ if (CollectionUtils.isNotEmpty(fetchers)){ this.itemFetcherExecutors.stream() // 是否能处理该类型 .filter(itemFetcherExecutor -> itemFetcherExecutor.support(cls)) // 执行真正的绑定 .forEach(itemFetcherExecutor -> itemFetcherExecutor.fetch(fetchers)); } }}
逻辑即为简单,依次遍历 FetcherExecutor,根据 support 执行结果,执行 fetch 逻辑。
【小常识】Spring 可以将容器中的全部实现直接注入到 List<Bean>。在上述代码中,将会把所有的 ItemFetcherExecutor 实现注入到 itemFetcherExecutors 属性。因此,在新增 FetcherExecutor 时,只需将其声明为 Spring Bean,无需调整代码逻辑。
OK,我们有了 FetcherService 提供统一的数据绑定能力,原来 OrderServiceFetcher 中 fetch 操作的变化点转移到 FetcherService,自身变得非常稳定。具体如下:
@Servicepublic class OrderServiceFetcherV2 implements OrderService { @Autowired private OrderRepository orderRepository; @Autowired private FetcherService fetcherService; @Override public List<OrderListVO> getByUserId(Long userId) { List<Order> orders = this.orderRepository.getByUserId(userId); List<OrderListVOFetcherV2> orderDetailVOS = orders.stream() .map(order -> new OrderListVOFetcherV2(OrderVO.apply(order))) .collect(toList()); // VO 数据绑定发生变化,只需调整 VO 实现接口,此处无需变化 fetcherService.fetch(OrderListVOFetcherV2.class, orderDetailVOS); return orderDetailVOS.stream() .collect(toList()); } @Override public OrderDetailVO getDetailByOrderId(Long orderId) { Order order = this.orderRepository.getById(orderId); OrderDetailVOFetcherV2 orderDetail = new OrderDetailVOFetcherV2(OrderVO.apply(order)); // VO 数据绑定发生变化,只需调整 VO 实现接口,此处无需变化 fetcherService.fetch(OrderDetailVOFetcherV2.class, Arrays.asList(orderDetail)); return orderDetail; }}
终于,我们将变化收敛到 VO 内,VO 需要绑定新的数据,只需实现对应接口即可。
经过重构,代码结构变得非常清晰,如果想通过多线程并发方式提供性能,需要调整哪些组件呢?好好想想!!!
只需对FetcherService进行调整,让我们来一个并发版本,具体如下:
@Servicepublic class ConcurrentFetcherService { private ExecutorService executorService; @Autowired private List<ItemFetcherExecutor> itemFetcherExecutors; @PostConstruct public void init(){ this.executorService = Executors.newFixedThreadPool(20); } @SneakyThrows public <F extends ItemFetcher> void fetch(Class<F> cls, List<F> fetchers){ if (CollectionUtils.isNotEmpty(fetchers)){ // 创建异步执行任务 List<Callable<Void>> callables = this.itemFetcherExecutors.stream() .filter(itemFetcherExecutor -> itemFetcherExecutor.support(cls)) .map(itemFetcherExecutor -> (Callable<Void>) () -> { itemFetcherExecutor.fetch(fetchers); return null; }).collect(Collectors.toList()); // 线程池中并行执行 this.executorService.invokeAll(callables); } }}
OrderServiceFetcherV3 只需使用 ConcurrentFetcherService 替代 原来的 FetcherService 并拥有了并发能力。
纵观整个 Fetcher 封装,虽然结构清晰,但细节过于繁琐,特别是:
这些不便将成为落地最大的阻碍,那有没有办法进行进一步简化?
这需要思考下这些设计背后的深层需求:
提供绑定信息
设置绑定结果
被 FetcherExecutor 识别并进行处理
所有这些需求是否可用 ==注解== 的方式实现?
根据上述分析,注解可完成全部任务,新建注解如下:
@Target({ElementType.FIELD, ElementType.TYPE})@Retention(RetentionPolicy.RUNTIME)public @interface JoinInMemory { /** * 从 sourceData 中提取 key * @return */ String keyFromSourceData(); /** * 从 joinData 中提取 key * @return */ String keyFromJoinData(); /** * 批量数据抓取 * @return */ String loader(); /** * 结果转换器 * @return */ String joinDataConverter() default ""; /** * 运行级别,同一级别的 join 可 并行执行 * @return */ int runLevel() default 10;}
乍一看,需要配置的信息真多,其实大多数配置全部与 FetcherExecutor 实现相关。
abstract class AbstractJoinItemExecutor<SOURCE_DATA, JOIN_KEY, JOIN_DATA, JOIN_RESULT> implements JoinItemExecutor<SOURCE_DATA> { /** * 从原始数据中生成 JoinKey * @param data * @return */ protected abstract JOIN_KEY createJoinKeyFromSourceData(SOURCE_DATA data); /** * 根据 JoinKey 批量获取 JoinData * @param joinKeys * @return */ protected abstract List<JOIN_DATA> getJoinDataByJoinKeys(List<JOIN_KEY> joinKeys); /** * 从 JoinData 中获取 JoinKey * @param joinData * @return */ protected abstract JOIN_KEY createJoinKeyFromJoinData(JOIN_DATA joinData); /** * 将 JoinData 转换为 JoinResult * @param joinData * @return */ protected abstract JOIN_RESULT convertToResult(JOIN_DATA joinData); /** * 将 JoinResult 写回至 SourceData * @param data * @param JoinResults */ protected abstract void onFound(SOURCE_DATA data, List<JOIN_RESULT> JoinResults); /** * 未找到对应的 JoinData * @param data * @param joinKey */ protected abstract void onNotFound(SOURCE_DATA data, JOIN_KEY joinKey); @Override public void execute(List<SOURCE_DATA> sourceDatas) { // 从源数据中提取 JoinKey List<JOIN_KEY> joinKeys = sourceDatas.stream() .filter(Objects::nonNull) .map(this::createJoinKeyFromSourceData) .filter(Objects::nonNull) .distinct() .collect(toList()); log.debug("get join key {} from source data {}", joinKeys, sourceDatas); // 根据 JoinKey 获取 JoinData List<JOIN_DATA> allJoinDatas = getJoinDataByJoinKeys(joinKeys); log.debug("get join data {} by join key {}", allJoinDatas, joinKeys); // 将 JoinData 以 Map 形式进行组织 Map<JOIN_KEY, List<JOIN_DATA>> joinDataMap = allJoinDatas.stream() .filter(Objects::nonNull) .collect(groupingBy(this::createJoinKeyFromJoinData)); log.debug("group by join key, result is {}", joinDataMap); // 处理每一条 SourceData for (SOURCE_DATA data : sourceDatas){ // 从 SourceData 中 获取 JoinKey JOIN_KEY joinKey = createJoinKeyFromSourceData(data); if (joinKey == null){ log.warn("join key from join data {} is null", data); continue; } // 根据 JoinKey 获取 JoinData List<JOIN_DATA> joinDatasByKey = joinDataMap.get(joinKey); if (CollectionUtils.isNotEmpty(joinDatasByKey)){ // 获取到 JoinData, 转换为 JoinResult,进行数据写回 List<JOIN_RESULT> joinResults = joinDatasByKey.stream() .filter(Objects::nonNull) .map(joinData -> convertToResult(joinData)) .collect(toList()); log.debug("success to convert join data {} to join result {}", joinDatasByKey, joinResults); onFound(data, joinResults); log.debug("success to write join result {} to source data {}", joinResults, data); }else { log.warn("join data lost by join key {} for source data {}", joinKey, data); // 为获取到 JoinData,进行 notFound 回调 onNotFound(data, joinKey); } } }}
JoinInMemory 注解属性和AbstractJoinItemExecutor基本一致,在此就不做赘述,我们先看下具体的使用方式:
@Datapublic class OrderDetailVOAnnV1 extends OrderDetailVO { private final OrderVO order; @JoinInMemory(keyFromSourceData = "#{order.userId}", keyFromJoinData = "#{id}", loader = "#{@userRepository.getByIds(#root)}", joinDataConverter = "#{T(com.geekhalo.lego.joininmemory.order.UserVO).apply(#root)}" ) private UserVO user; // 其他暂时忽略}@Datapublic class OrderListVOAnnV1 extends OrderListVO { private final OrderVO order; @JoinInMemory(keyFromSourceData = "#{order.userId}", keyFromJoinData = "#{id}", loader = "#{@userRepository.getByIds(#root)}", joinDataConverter = "#{T(com.geekhalo.lego.joininmemory.order.UserVO).apply(#root)}" ) private UserVO user; // 其他暂时忽略}
我们以 UserVO user 属性为例
属性 | 含义 |
keyFromSourceData = "#{order.userId}" | 以属性 order 中的 userId 作为 JoinKey |
keyFromJoinData = "#{id}" | 以 user 的 id 作为 JoinKey |
loader = "#{@userRepository.getByIds(#root)}" | 将 userRepository bean 的 getByIds 方法作为加载器,其中 #root 为 joinKey 集合(user id 集合) |
joinDataConverter = "#{T(com.geekhalo.lego.joininmemory.order.UserVO).apply(#root)}" | 将 com.geekhalo.lego.joininmemory.order.UserVO 静态方法 apply 作为转换器,#root 指的是 User 对象 |
@JoinInMemory 注解中大量使用 SpEL,不熟悉的伙伴可以自行网上进行检索。
其他部分不变,定义 OrderService 如下:
@Servicepublic class OrderServiceAnnV1 implements OrderService { @Autowired private OrderRepository orderRepository; @Autowired private JoinService joinService; @Override public List<OrderListVO> getByUserId(Long userId) { List<Order> orders = this.orderRepository.getByUserId(userId); List<OrderListVOAnnV1> orderDetailVOS = orders.stream() .map(order -> new OrderListVOAnnV1(OrderVO.apply(order))) .collect(toList()); this.joinService.joinInMemory(OrderListVOAnnV1.class, orderDetailVOS); return orderDetailVOS.stream() .collect(toList()); } @Override public OrderDetailVO getDetailByOrderId(Long orderId) { Order order = this.orderRepository.getById(orderId); OrderDetailVOAnnV1 orderDetail = new OrderDetailVOAnnV1(OrderVO.apply(order)); this.joinService.joinInMemory(OrderDetailVOAnnV1.class, Arrays.asList(orderDetail)); return orderDetail; }}
相对于 Fetcher 抽象,我们将 Fetcher、FetcherExecutor 全部配置化,并通过 注解的方式进行呈现,相对于 Coding 方案,注解方案更加灵活,工作量也更小。
相对于 Fetcher 封装,一个 @JoinInMemory 成功干掉了两个组件,但观其自身配置起来还是非常繁琐。比如,在订单查询这个场景,在 OrderListVO 和 OrderDetailVO 中都需要对 UserVO 进行数据绑定,观察两个注解,我们会发现很多重复配置:
//OrderListVO@JoinInMemory(keyFromSourceData = "#{order.userId}", keyFromJoinData = "#{id}", loader = "#{@userRepository.getByIds(#root)}", joinDataConverter = "#{T(com.geekhalo.lego.joininmemory.order.UserVO).apply(#root)}" )private UserVO user;// OrderDetailVO@JoinInMemory(keyFromSourceData = "#{order.userId}", keyFromJoinData = "#{id}", loader = "#{@userRepository.getByIds(#root)}", joinDataConverter = "#{T(com.geekhalo.lego.joininmemory.order.UserVO).apply(#root)}" )private UserVO user;
两个配置完全一样,细品之后会发现:
OrderListVO 指的是 OrderListVO 属性 order 的id值
OrderDetailVO 指的是 OrderDetailVO 属性 order 的值
对于不变部分如何进行统一管理?
自定义注解 结合 Spring @AliasFor 便可以解决这个问题,以 UserVO 为例:
@Target(ElementType.FIELD)@Retention(RetentionPolicy.RUNTIME)// 管理通用属性@JoinInMemory(keyFromSourceData = "", keyFromJoinData = "#{id}", loader = "#{@userRepository.getByIds(#root)}", joinDataConverter = "#{T(com.geekhalo.lego.joininmemory.order.UserVO).apply(#root)}")public @interface JoinUserVOOnId { // 使用别名将 keyFromSourceData 的配置暴露出来 @AliasFor( annotation = JoinInMemory.class ) String keyFromSourceData();}
新注解有如下几个特点:
有了自定义注解,使用变的非常方便:
@Datapublic class OrderListVOAnnV2 extends OrderListVO { private final OrderVO order; // 只需配置参数即可,其他配置由 JoinUserVOOnId 进行管理 @JoinUserVOOnId(keyFromSourceData = "#{order.userId}") private UserVO user;}@Datapublic class OrderDetailVOAnnV2 extends OrderDetailVO { private final OrderVO order; // 只需配置参数即可,其他配置由 JoinUserVOOnId 进行管理 @JoinUserVOOnId(keyFromSourceData = "#{order.userId}") private UserVO user;}
其他使用方式不变,但实现了逻辑简化:
如果担心性能,可以一键开启并发绑定,示例如下:
@Data@JoinInMemoryConfig(executorType = JoinInMemeoryExecutorType.PARALLEL)public class OrderListVOAnnV3 extends OrderListVO { private final OrderVO order; @JoinUserVOOnId(keyFromSourceData = "#{order.userId}") private UserVO user; @JoinAddressVOOnId(keyFromSourceData = "#{order.addressId}") private AddressVO address; @JoinProductVOOnId(keyFromSourceData = "#{order.productId}") private ProductVO product;}
JoinInMemoryConfig 配置如下:
属性 | 含义 |
executorType | PARALLEL 并行执行;SERIAL 串行执行 |
executorName | 执行器名称,并行执行所使用的线程池名称,默认为 defaultExecutor |
@JoinInMemory 注解上配置的信息太多,如果直接在业务代码中使用,非常难以维护,当每个配置发生变化后,很难一次性修改到位。所以,建议只将他作为“原注解”使用。
整体思路详见:
图片
对于不同的数据绑定需求,建议使用不同的线程池,从资源层面对不同功能进行隔离,从而将由于依赖接口发生阻塞导致线程耗尽所造成的影响控制在最小范围。
@JoinInMemoryConfig 的 executorName 属性配置的便是执行器名称,不配置直接使用 “defaultExecutor”,具体代码如下:
@Beanpublic ExecutorService defaultExecutor(){ BasicThreadFactory basicThreadFactory = new BasicThreadFactory.Builder() .namingPattern("JoinInMemory-Thread-%d") .daemon(true) .build(); int maxSize = Runtime.getRuntime().availableProcessors() * 3; return new ThreadPoolExecutor(0, maxSize, 60L, TimeUnit.SECONDS, new SynchronousQueue<>(), basicThreadFactory, new ThreadPoolExecutor.CallerRunsPolicy());}
如需使用自定义线程池需:
推导逻辑有点长不知道你get到多少,先简单回顾一下:
不知道你有什么感触,接下来,有什么计划?
本文链接://www.dmpip.com//www.dmpip.com/showinfo-26-34640-0.htmlDDD死党:内存Join--将复用和扩展用到极致
声明:本网页内容旨在传播知识,若有侵权等问题请及时与本网联系,我们将在第一时间删除处理。邮件:2376512515@qq.com
下一篇: Python VTK 初探数据源