当前位置:首页 > 科技  > 软件

【技术革命】JDK21虚拟线程来袭,让系统的吞吐量翻倍!

来源: 责编: 时间:2023-11-04 23:04:46 230观看
导读1. 虚拟线程简介虚拟线程是一种轻量级线程,可大大减少编写、维护和观察高吞吐量并发应用程序的工作量。从JDK19开始发布了虚拟线程的预览功能,直到JDK21最终确定虚拟线程。虚拟线程既廉价(相比平台线程)又可以创建非常的

1. 虚拟线程简介

虚拟线程是一种轻量级线程,可大大减少编写、维护和观察高吞吐量并发应用程序的工作量。从JDK19开始发布了虚拟线程的预览功能,直到JDK21最终确定虚拟线程。U2i28资讯网——每日最新资讯28at.com

虚拟线程既廉价(相比平台线程)又可以创建非常的多,因此绝不应池化:每个应用任务都应创建一个新的虚拟线程。因此,大多数虚拟线程的寿命都很短,调用堆栈也很浅,只需执行一次 HTTP 客户端调用或一次 JDBC 查询。相比之下,平台线程重量级、成本高,因此通常必须池化。这些线程的寿命往往较长,具有较深的调用堆栈,可在多个任务之间共享。

总之,虚拟线程保留了可靠的每请求线程风格,这种风格与 Java 平台的设计相协调,同时还能优化利用可用硬件。使用虚拟线程不需要学习新的概念,但可能需要放弃为应对当前线程的高成本而养成的习惯。虚拟线程不仅能帮助应用程序开发人员,还能帮助框架设计人员提供易于使用的 API,这些 API 与平台设计兼容,同时又不影响可扩展性。U2i28资讯网——每日最新资讯28at.com

虚拟线程是 java.lang.Thread 的一个实例,它在底层操作系统线程上运行 Java 代码,但在代码的整个生命周期中不会捕获操作系统线程。这意味着许多虚拟线程可以在同一个操作系统线程上运行 Java 代码,从而有效地共享操作系统线程。平台线程会垄断宝贵的操作系统线程,而虚拟线程不会。虚拟线程的数量可能远远大于操作系统线程的数量。U2i28资讯网——每日最新资讯28at.com

虚拟线程是线程的一种轻量级实现,由 JDK 而不是操作系统提供。它们是用户模式线程的一种形式,在其他多线程语言(如 Go 中的 goroutines(协程(轻量级线程)) 和 Erlang 中的进程)中取得了成功。用户模式线程在 Java 早期版本中甚至被称为 "绿色线程",当时操作系统线程尚未成熟和普及。然而,Java 的绿色线程都共享一个操作系统线程(M:1 调度),最终被作为操作系统线程包装器(1:1 调度)实现的平台线程所超越。虚拟线程采用 M:N 调度,即大量(M)虚拟线程被安排运行在较少数量(N)的操作系统线程上。U2i28资讯网——每日最新资讯28at.com

虚拟线程是 java.lang.Thread 的一个实例,与特定操作系统线程无关。相比之下,平台线程是以传统方式实现的 java.lang.Thread 实例,是操作系统线程的薄包装。U2i28资讯网——每日最新资讯28at.com

2. 传统请求线程模型

通常服务器应用程序处理相互独立的并发请求时,在请求的整个持续声明周期内为该请求指定一个线程来处理该请求。这种按请求线程的风格易于理解、易于编程、易于调试和配置。U2i28资讯网——每日最新资讯28at.com

对于一个请求处理的处理时间,应用程序同时处理的请求数(即并发数)必须与吞吐量成比例增长。例如,假设一个平均延迟为 50 毫秒的请求并发处理 10 个请求,实现了每秒 200 个请求的吞吐量。若要将该应用的吞吐量提高到到每秒 2000 个请求,则需要并发处理 100 个请求。如果每个请求在请求持续时间内都由一个线程处理,那么要使应用程序跟上进度,线程数必须随着吞吐量的增加而增加。U2i28资讯网——每日最新资讯28at.com

由于 JDK 将线程作为操作系统(OS)线程的包装器来实现。操作系统线程的成本很高,所以我们不能拥有太多的线程,这就使得线程的实现不适合按请求执行的方式。如果每个请求在其生命周期内都要使用一个线程,也就是一个操作系统线程,那么在 CPU 或网络连接等其他资源耗尽之前,线程数量往往就已经成为限制因素了。JDK 当前的线程实现将应用程序的吞吐量限制在远低于硬件支持的水平。即使对线程进行了池化,也会出现这种情况,因为池化有助于避免启动新线程的高昂成本,但不会增加线程总数。U2i28资讯网——每日最新资讯28at.com

3. 虚拟线程使用

使用方式1:

// 创建一个执行器,为每个任务启动一个新的虚拟线程try (var executor = Executors.newVirtualThreadPerTaskExecutor()) {    IntStream.range(0, 10_000).forEach(i -> {        executor.submit(() -> {            Thread.sleep(Duration.ofSeconds(1));            return i;        });    });}

本例中的任务是简单的代码--休眠1秒--现代硬件可以轻松支持 10,000 个虚拟线程同时运行此类代码。而实际上,JDK 只在少量操作系统线程(可能只有一个)上运行此代码代码。U2i28资讯网——每日最新资讯28at.com

如果该程序使用 ExecutorService(例如 Executors.newCachedThreadPool())为每个任务创建一个新的平台线程,情况就会截然不同。ExecutorService 会尝试创建 10,000 个平台线程,从而创建 10,000 个操作系统线程,根据机器和操作系统的不同,程序可能会崩溃。U2i28资讯网——每日最新资讯28at.com

即便使用Executors.newFixedThreadPool(200)创建固定数量的线程,情况也不会好到哪里去。ExecutorService 将创建 200 个平台线程,供所有 10,000 个任务共享,因此许多任务将顺序运行而非并发运行,程序将需要很长时间才能完成。对于该程序而言,拥有 200 个平台线程的池每秒只能完成 200 个任务,而虚拟线程每秒可完成约 10,000 个任务(经过充分预热后)。此外,如果将示例程序中的 10_000 改为 1_000_000,那么程序将提交 1,000,000 个任务,创建 1,000,000 个虚拟线程并发运行,(充分预热后)吞吐量将达到每秒约 1,000,000 个任务。U2i28资讯网——每日最新资讯28at.com

注意:如果程序中的任务在一秒钟内执行计算(例如对一个巨大的数组进行排序),而不仅仅是休眠,那么增加线程数超过处理器内核数将无济于事,无论它们是虚拟线程还是平台线程。虚拟线程不是更快的线程--它们运行代码的速度并不比平台线程快。它们的存在是为了提供规模(更高的吞吐量),而不是速度(更低的延迟)。虚拟线程的数量可能比平台线程多得多,因此根据利特尔定律,虚拟线程可以提供更高吞吐量所需的更高并发性。U2i28资讯网——每日最新资讯28at.com

使用方式2:

手动创建虚拟线程U2i28资讯网——每日最新资讯28at.com

// 创建虚拟线程OfVirtual virtual = Thread.ofVirtual().name("pack") ;virtual.start(() -> {  System.out.printf("%s - 任务执行完成", Thread.currentThread().getName()) ;}) ;// 创建不自动启动的线程Thread thread = virtual.unstarted(() -> {  System.out.printf("%s - 任务执行完成", Thread.currentThread().getName()) ;}) ;// 手动启动虚拟线程thread.start() ; // 打印线程对象:VirtualThread[#21,pack]/runnableSystem.out.println(thread) ;// 创建普通线程OfPlatform platform = Thread.ofPlatform().name("pack") ;Thread thread = platform.start(() -> {  System.out.printf("%s - 任务执行完成", Thread.currentThread().getName()) ;}) ;// 这里输出:Thread[#21,pack,5,main]System.out.println(thread) ;

在上面的代码中,打印thread输出的不是对应的平台线程,而是虚拟线程U2i28资讯网——每日最新资讯28at.com

VirtualThread[#21,pack]/runnable

在执行的任务中通过Thread.currentThread().getName()方法是没有任何信息,我们可以通过上面的name()方法来设置线程的名称及相关的前缀。如下:U2i28资讯网——每日最新资讯28at.com

Thread.ofPlatform().name("pack") ;Thread.ofVirtual().name("pack", 0) ;

使用方式3:

通过ThreadFactory工厂创建U2i28资讯网——每日最新资讯28at.com

ThreadFactory threadFactory = Thread.ofVirtual().factory() ;threadFactory.newThread(() -> {  System.out.printf("%s - 任务执行完成", Thread.currentThread().getName()) ;}).start() ;

使用方式4:

直接通过Thread静态方法U2i28资讯网——每日最新资讯28at.com

Thread.startVirtualThread(() -> {  System.out.printf("%s - 任务执行完成", Thread.currentThread().getName()) ;}) ;

4. 虚拟线程与传统线程池对比

使用虚拟线程U2i28资讯网——每日最新资讯28at.com

public class Demo06 {  static class Task implements Runnable {    @Override    public void run() {      System.err.printf("start - %d%n", System.currentTimeMillis()) ;      try {        Thread.sleep(Duration.ofSeconds(1));      } catch (InterruptedException e) {}      System.err.printf("  end - %d%n", System.currentTimeMillis()) ;    }  }  public static void main(String[] args) throws Exception {    ExecutorService es= Executors.newVirtualThreadPerTaskExecutor() ;    es.submit(new Task()) ;    es.submit(new Task()) ;    es.submit(new Task()) ;    System.in.read() ;  }}

输出结果:U2i28资讯网——每日最新资讯28at.com

start - 1698827467289start - 1698827467289start - 1698827467291  end - 1698827468317  end - 1698827468317  end - 1698827468317

从结果看出,基本是同时开始,结束也是基本一起结束,总耗时1s。U2i28资讯网——每日最新资讯28at.com

使用传统线程
U2i28资讯网——每日最新资讯28at.com

任务都一样,只是创建线程池的类型修改U2i28资讯网——每日最新资讯28at.com

public static void main(String[] args) throws Exception {  ExecutorService es= Executors.newFixedThreadPool(1) ;  es.submit(new Task()) ;  es.submit(new Task()) ;  es.submit(new Task()) ;}

输出结果:U2i28资讯网——每日最新资讯28at.com

start - 1698827686133  end - 1698827687165start - 1698827687165  end - 1698827688177start - 1698827688177  end - 1698827689178

从结果知道这里是一个任务一个任务的执行串行化,但是你注意观察,其实每个任务的的开始start 的输出都是要等前一个线程执行完了后才能执行。结合上面的虚拟线程对比,start是同时输出的,这也是虚拟线程的有点了。U2i28资讯网——每日最新资讯28at.com

5. 使用案例

这是一个远程接口调用的示例:U2i28资讯网——每日最新资讯28at.com

远程3个接口,如下:U2i28资讯网——每日最新资讯28at.com

@GetMapping("/userinfo")public Object queryUserInfo() {  try {    TimeUnit.SECONDS.sleep(2) ;  } catch (InterruptedException e) {e.printStackTrace();}  return "查询用户信息" ;}@GetMapping("/stock")public Object queryStock() {  try {    TimeUnit.SECONDS.sleep(3) ;  } catch (InterruptedException e) {e.printStackTrace();}  return "查询库存信息" ;}@GetMapping("/order")public Object queryOrder() {  try {    TimeUnit.SECONDS.sleep(4) ;  } catch (InterruptedException e) {e.printStackTrace();}  return "查询订单信息" ;}

接口调用服务,如下:U2i28资讯网——每日最新资讯28at.com

@Resourceprivate RestTemplate restTemplate ;public Map<String, Object> rpc() {  try (var executor = Executors.newVirtualThreadPerTaskExecutor()) {    var start = System.currentTimeMillis() ;    // 1.查询用户信息    var userinfo = executor.submit(() -> query("http://localhost:8080/demos/userinfo"));    // 2.查询库存信息    var stock = executor.submit(() -> query("http://localhost:8080/demos/stock"));    // 3.查询订单信息    var order = executor.submit(() -> query("http://localhost:8080/demos/order"));    Map<String, Object> res = Map.of("userinfo", userinfo.get(), "stock", stock.get(), "order", order.get()) ;    System.out.printf("总计耗时:%d毫秒%n", (System.currentTimeMillis() - start)) ;    return res ;  } catch (Exception e) {    return Map.of() ;  }}private Object query(String url) {  return this.restTemplate.getForObject(url, String.class) ;}

在这个案例中,如果使用传统的线程池,如果并发量大,那么很可能很多的任务都要排队等待,或者你需要创建更多的平台线程来满足吞吐量问题。但是现在有了虚拟线程你可以不用再考虑线程不够用的情况了,每个任务的执行都会被一个虚拟的线程执行(不是平台线程,可能这些虚拟线程只会对应到一个平台线程)。U2i28资讯网——每日最新资讯28at.com

虚拟线程可在以下情况显著提高应用吞吐量:U2i28资讯网——每日最新资讯28at.com

  • 并发任务的数量很高(超过几千)
  • 工作负载不受cpu限制,因为在这种情况下,线程比处理器内核多并不能提高吞吐量

6. 结构化并发(预览功能)

结构化并发目前还是预览功能,并没有在JDK21中正式发布,不过我们可以先来看看什么是结构化并发。U2i28资讯网——每日最新资讯28at.com

结构化并发 API 是来简化并发编程。结构化并发将在不同线程中运行的一组相关任务视为一个工作单元,从而简化了错误处理和取消,提高了可靠性,并增强了可观察性。U2i28资讯网——每日最新资讯28at.com

结构化并发的目标是:U2i28资讯网——每日最新资讯28at.com

  • 推广一种并发编程风格,消除因取消和关闭而产生的常见风险,如线程泄漏和取消延迟。
  • 提高并发代码的可观察性。

通过示例来理解结构化并发。U2i28资讯网——每日最新资讯28at.com

如下示例是通过传统线程池的方式并发的从远程获取信息,代码如下:U2i28资讯网——每日最新资讯28at.com

static RestTemplate restTemplate = new RestTemplate() ;public static void main(String[] args) throws Exception {  ExecutorService es = Executors.newFixedThreadPool(2) ;  Future<Object> userInfo = es.submit(UnstructuredConcurrentDemo::queryUserInfo) ;  Future<Object> stock = es.submit(UnstructuredConcurrentDemo::queryStock) ;  Object userInfoRet = userInfo.get() ;  System.out.printf("执行结果:用户信息:%s%n", userInfoRet.toString()) ;  Object stockRet = stock.get() ;  System.out.printf("执行结果:库存信息:%s%n", stockRet.toString()) ;}public static Object queryUserInfo() {  return restTemplate.getForObject("http://localhost:8080/demos/userinfo", String.class) ;}public static Object queryStock() {  return restTemplate.getForObject("http://localhost:8080/demos/stock", String.class) ;}

上面的代码中没有什么问题,程序都能够运行的正常,结果如下:U2i28资讯网——每日最新资讯28at.com

08:49:53.502 [pool-1-thread-1] DEBUG org.springframework.web.client.RestTemplate -- Response 200 OK08:49:53.504 [pool-1-thread-1] DEBUG org.springframework.web.client.RestTemplate -- Reading to [java.lang.String] as "text/plain;charset=UTF-8"执行结果:用户信息:查询用户信息08:49:54.493 [pool-1-thread-2] DEBUG org.springframework.web.client.RestTemplate -- Response 200 OK08:49:54.493 [pool-1-thread-2] DEBUG org.springframework.web.client.RestTemplate -- Reading to [java.lang.String] as "text/plain;charset=UTF-8"执行结果:库存信息:查询库存信息

但是如果其中一个任务执行失败了后会如何呢?将其中一个任务抛出异常,如下代码:U2i28资讯网——每日最新资讯28at.com

public static Object queryStock() {  System.out.println(1 / 0) ;  return restTemplate.getForObject("http://localhost:8080/demos/stock", String.class) ;}

再次执行代码,结果如下:U2i28资讯网——每日最新资讯28at.com

发生异常:java.lang.ArithmeticException: / by zero09:06:05.938 [pool-1-thread-2] DEBUG org.springframework.web.client.RestTemplate -- HTTP GET http://localhost:8080/demos/stock09:06:05.948 [pool-1-thread-2] DEBUG org.springframework.web.client.RestTemplate -- Accept=[text/plain, application/json, application/*+json, */*]09:06:08.972 [pool-1-thread-2] DEBUG org.springframework.web.client.RestTemplate -- Response 200 OK09:06:08.974 [pool-1-thread-2] DEBUG org.springframework.web.client.RestTemplate -- Reading to [java.lang.String] as "text/plain;charset=UTF-8"执行结果:库存信息:查询库存信息

从结果看出,获取用户信息子任务发生异常后,并不会影响到获取库存子任务的执行。U2i28资讯网——每日最新资讯28at.com

通过结构化并发方式U2i28资讯网——每日最新资讯28at.com

try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {  Supplier<Object>  userInfo  = scope.fork(UnstructuredConcurrentDemo::queryUserInfo) ;  Supplier<Object> stock = scope.fork(UnstructuredConcurrentDemo::queryStock) ;  // 等待在此任务范围内启动的所有子任务完成或某个子任务失败。  scope.join() ;  Object userInfoRet = userInfo.get() ;  System.out.printf("执行结果:用户信息:%s%n", userInfoRet.toString()) ;  Object stockRet = stock.get() ;  System.out.printf("执行结果:库存信息:%s%n", stockRet.toString()) ;}

当一个子任务发生错误时,其它的子任务会在未完成的情况下取消,执行结果如下:U2i28资讯网——每日最新资讯28at.com

08:59:51.951 [] DEBUG org.springframework.web.client.RestTemplate -- HTTP GET http://localhost:8080/demos/stock08:59:51.961 [] DEBUG org.springframework.web.client.RestTemplate -- Accept=[text/plain, application/json, application/*+json, */*]Exception in thread "main" java.lang.IllegalStateException: Subtask not completed or did not complete successfully  at java.base/java.util.concurrent.StructuredTaskScope$SubtaskImpl.get(StructuredTaskScope.java:936)  at com.pack.rpc.UnstructuredConcurrentDemo.structured(UnstructuredConcurrentDemo.java:26)  at com.pack.rpc.UnstructuredConcurrentDemo.main(UnstructuredConcurrentDemo.java:17)

从控制台的输出看出,获取库存的调用被取消了。U2i28资讯网——每日最新资讯28at.com

完毕!!!U2i28资讯网——每日最新资讯28at.com

本文链接://www.dmpip.com//www.dmpip.com/showinfo-26-16937-0.html【技术革命】JDK21虚拟线程来袭,让系统的吞吐量翻倍!

声明:本网页内容旨在传播知识,若有侵权等问题请及时与本网联系,我们将在第一时间删除处理。邮件:2376512515@qq.com

上一篇: 超越基础:Flutter 中 onTap 事件的 5 条规则让你脱颖而出

下一篇: Sed 原地替换文件时遇到的趣事

标签:
  • 热门焦点
Top
Baidu
map