Spring Boot 高效数据聚合之道

原文地址: Spring Boot 高效数据聚合之道

项目地址和示例代码: https://github.com/lvyahui8/spring-boot-data-aggregator

1. 背景

接口开发是后端开发中最常见的场景,可能是 RESTFul 接口,也可能是 RPC 接口. 接口开发往往是从各处捞出数据,然后组装成结果,特别是那些偏业务的接口.

如何方便快速的开发高性能的接口 , 是一个必须思考的问题.

例如,我现在需要实现一个接口,拉取用户基础信息 + 用户的博客列表 + 用户的粉丝数据的整合数据,假设已经有如下三个接口可以使用,分别用来获取 用户基础信息 , 用户博客列表 , 用户的粉丝数据.

用户基础信息

1
2
3
4
5
6
7
8
9
10
11
12
13
@Service
public class UserServiceImpl implements UserService {
@Override
public User get(Long id) {
try {Thread.sleep(1000L);} catch (InterruptedException e) {}
/* mock a user*/
User user = new User();
user.setId(id);
user.setEmail("lvyahui8@gmail.com");
user.setUsername("lvyahui8");
return user;
}
}

用户博客列表

1
2
3
4
5
6
7
8
9
10
11
@Service
public class PostServiceImpl implements PostService {
@Override
public List<Post> getPosts(Long userId) {
try { Thread.sleep(1000L); } catch (InterruptedException e) {}
Post post = new Post();
post.setTitle("spring data aggregate example");
post.setContent("No active profile set, falling back to default profiles");
return Collections.singletonList(post);
}
}

用户的粉丝数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
@Service
public class FollowServiceImpl implements FollowService {
@Override
public List<User> getFollowers(Long userId) {
try { Thread.sleep(1000L); } catch (InterruptedException e) {}
int size = 10;
List<User> users = new ArrayList<>(size);
for(int i = 0 ; i < size; i++) {
User user = new User();
user.setUsername("name"+i);
user.setEmail("email"+i+"@fox.com");
user.setId((long) i);
users.add(user);
};
return users;
}
}

注意,每一个方法都 sleep 了 1s 以模拟业务耗时.

我们需要再封装一个接口,来拼装以上三个接口的数据.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
@Component
public class UserQueryFacade {
@Autowired
private FollowService followService;
@Autowired
private PostService postService;
@Autowired
private UserService userService;

public User getUserData(Long userId) {
User user = userService.get(userId);
user.setPosts(postService.getPosts(userId));
user.setFollowers(followService.getFollowers(userId));
return user;
}
}

很明显,上面的代码,效率低下,起码要 3s 才能拿到结果 , 且一旦用到某个接口的数据,便需要注入相应的 service, 复用麻烦.

2. 并行实现

有追求的程序员可能立马会考虑到,这几项数据之间并无强依赖性,完全可以并行获取嘛,通过异步线程 + CountDownLatch+Future 实现,就像下面这样.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
@Component
public class UserQueryFacade {
@Autowired
private FollowService followService;
@Autowired
private PostService postService;
@Autowired
private UserService userService;

public User getUserDataByParallel(Long userId) throws InterruptedException, ExecutionException {
// 创建固定大小的线程池
ExecutorService executorService = Executors.newFixedThreadPool(3);
// CountDownLatch类位于java.util.concurrent包下,利用它可以实现类似计数器的功能。比如有一个任务A,它要等待其他4个任务执行完毕之后才能执行,此时就可以利用CountDownLatch来实现这种功能了。
CountDownLatch countDownLatch = new CountDownLatch(3); // 参数count为计数值
Future<User> userFuture = executorService.submit(() -> {
try{
return userService.get(userId);
}finally {
countDownLatch.countDown();
}
});
Future<List<Post>> postsFuture = executorService.submit(() -> {
try{
return postService.getPosts(userId);
}finally {
countDownLatch.countDown();
}
});
Future<List<User>> followersFuture = executorService.submit(() -> {
try{
return followService.getFollowers(userId);
}finally {
countDownLatch.countDown();
}
});
// 使当前线程在锁存器倒计数至零之前一直等待,除非线程被中断。
countDownLatch.await();
User user = userFuture.get();
user.setFollowers(followersFuture.get());
user.setPosts(postsFuture.get());
return user;
}
}

上面的代码,将串行调用改为并行调用,在有限并发级别下,能极大提高性能. 但很明显,它过于复杂,如果每个接口都为了并行执行都写这样一段代码,简直是噩梦.

3. 优雅的注解实现

熟悉 java 的都知道,java 有一种非常便利的特性~~注解。简直是黑魔法。往往只需要给类或者方法上添加一些注解,便可以实现非常复杂的功能.

有了注解,再结合 Spring 依赖自动注入的思想,那么我们可不可以通过注解的方式,自动注入依赖,自动并行调用接口呢?答案是肯定的.

首先,我们先定义一个聚合接口 (当然也可以不定义聚合类,所有代码写在原 Service 类中同样可以)

1
2
3
4
5
6
7
8
9
10
11
@Component
public class UserAggregate {
@DataProvider("userFullData")
public User userFullData(@DataConsumer("user") User user,
@DataConsumer("posts") List<Post> posts,
@DataConsumer("followers") List<User> followers) {
user.setFollowers(followers);
user.setPosts(posts);
return user;
}
}

其中

  • @DataProvider 表示这个方法是一个数据提供者,数据 Id 为 userFullData
  • @DataConsumer 表示这个方法的参数,需要消费数据,数据 Id 分别为 user ,posts, followers.

当然,原来的 3 个原子服务 用户基础信息 , 用户博客列表 , 用户的粉丝数据 , 也分别需要添加一些注解

1
2
3
4
5
@Service
public class UserServiceImpl implements UserService {
@DataProvider("user")
@Override
public User get(@InvokeParameter("userId") Long id) {
1
2
3
4
5
@Service
public class PostServiceImpl implements PostService {
@DataProvider("posts")
@Override
public List<Post> getPosts(@InvokeParameter("userId") Long userId) {
1
2
3
4
5
@Service
public class FollowServiceImpl implements FollowService {
@DataProvider("followers")
@Override
public List<User> getFollowers(@InvokeParameter("userId") Long userId) {

其中

  • @DataProvider 与前面的含义相同,表示这个方法是一个数据提供者
  • @InvokeParameter 表示方法执行时,需要手动传入的参数

这里注意 @InvokeParameter@DataConsumer 的区别,前者需要用户在最上层调用时手动传参;而后者,是由框架自动分析依赖,并异步调用取得结果之后注入的.

最后,仅仅只需要调用一个统一的门面 (Facade) 接口,传递数据 Id, Invoke Parameters, 以及返回值类型. 剩下的并行处理,依赖分析和注入,完全由框架自动处理.

1
2
3
4
5
6
7
8
9
10
11
@Component
public class UserQueryFacade {
@Autowired
private DataBeanAggregateQueryFacade dataBeanAggregateQueryFacade;

public User getUserFinal(Long userId) throws InterruptedException,
IllegalAccessException, InvocationTargetException {
return dataBeanAggregateQueryFacade.get("userFullData",
Collections.singletonMap("userId", userId), User.class);
}
}

4. 如何用在你的项目中

上面的功能,笔者已经封装为一个 spring boot starter, 并发布到 maven 中央仓库.

只需在你的项目引入依赖.

1
2
3
4
5
<dependency>
<groupId>io.github.lvyahui8</groupId>
<artifactId>spring-boot-data-aggregator-starter</artifactId>
<version>1.0.2</version>
</dependency>

并在 application.properties 文件中声明注解的扫描路径.

1
2
# 替换成你需要扫描注解的包
io.github.lvyahui8.spring.base-packages=io.github.lvyahui8.spring.example

之后,就可以使用如下注解和 Spring Bean 实现聚合查询

  • @DataProvider
  • @DataConsumer
  • @InvokeParameter
  • Spring Bean DataBeanAggregateQueryFacade

注意,@DataConsumer@InvokeParameter 可以混合使用,可以用在同一个方法的不同参数上。且方法的所有参数必须有其中一个注解,不能有没有注解的参数.

项目地址和上述示例代码: https://github.com/lvyahui8/spring-boot-data-aggregator ,感谢给予 star, 欢迎一起参与完善

5. 特性

  • 异步获取依赖

    所有 @DataConsumer 定义的依赖将异步获取。当 provider 方法参数中的所有依赖获取完成,才执行 provider 方法

  • 不限级嵌套

    依赖关系支持深层嵌套。上面的示例只有一层

  • 异常处理

    目前支持两种处理方式:忽略 or 终止

    忽略是指 provider 方法在执行时,忽略抛出的异常并 return null 值;终止是指一旦有一个 provider 方法抛出了异常,将逐级向上抛出,终止后续处理.

    配置支持 consumer 级或者全局,优先级 : consumer 级 > 全局

  • 查询缓存

    在调用 Facade 的 query 方法的一次查询生命周期内,方法调用结果可能复用,只要方法签名以及传参一致,则默认方法是幂等的,将直接使用缓存的查询结果. 但这个不是绝对的,考虑到多线程的特性,可能有时候不会使用缓存

  • 超时控制 (待实现)

查询的核心代码在io.github.lvyahui8.spring.aggregate.service.DataBeanAggregateQueryService 文件, 感兴趣的可以去Github上下载查看.

6. 后期计划

后续笔者除新增更多功能外, 还会进一步提高插件的易用性, 高可用性, 扩展性

文章作者的公众号

打赏
  • 版权声明: 本博客所有文章除特别声明外,著作权归作者所有。转载请注明出处!
  • Copyrights © 2015-2023 高行行
  • 访问人数: | 浏览次数:

请我喝杯咖啡吧~

支付宝
微信