虚拟线程
虚拟线程是轻量级线程,可减少编写、维护和调试高吞吐量并发应用程序的工作量。有关虚拟线程的背景信息,请参阅JEP 444。线程是可以调度的最小处理单元。它与其他此类单元同时运行,并且在很大程度上独立于其他单元运行。它是java.lang.Thread. 线程有两种,平台线程和虚拟线程
什么是平台线程?
平台线程是作为操作系统 (OS) 线程的薄包装器实现的。平台线程在其底层 OS 线程上运行 Java 代码,并且平台线程在其整个生命周期内捕获其 OS 线程。因此,可用的平台线程数量受限于 OS 线程的数量。
平台线程通常具有较大的线程堆栈和由操作系统维护的其他资源。它们适合运行所有类型的任务,但资源可能有限。
什么是虚拟线程?
与平台线程类似,虚拟线程也是 java.lang.Thread。但是,虚拟线程并不与特定的操作系统线程绑定。虚拟线程仍在操作系统线程上运行代码。但是,当虚拟线程中运行的代码调用阻塞 I/O 操作时,Java 运行时会暂停虚拟线程,直到可以恢复为止。与暂停的虚拟线程关联的操作系统线程现在可以自由地执行其他虚拟线程的操作。
虚拟线程的实现方式与虚拟内存类似。为了模拟大量内存,操作系统会将较大的虚拟地址空间映射到有限的 RAM 中。同样,为了模拟大量线程,Java 运行时会将大量虚拟线程映射到少量 OS 线程中。
与平台线程不同,虚拟线程通常具有浅层调用堆栈,仅执行一次 HTTP 客户端调用或一次 JDBC 查询。尽管虚拟线程支持线程局部变量和可继承的线程局部变量,但您应谨慎考虑使用它们,因为单个 JVM 可能支持数百万个虚拟线程。
虚拟线程适合运行大部分时间处于阻塞状态(通常等待 I/O 操作完成)的任务。但它们不适合长时间运行的 CPU 密集型操作。
为什么要使用虚拟线程?
在高吞吐量并发应用程序中使用虚拟线程,尤其是那些包含大量并发任务且大部分时间都在等待的应用程序。服务器应用程序是高吞吐量应用程序的示例,因为它们通常处理许多执行阻塞 I/O 操作(例如获取资源)的客户端请求。
虚拟线程并不是更快的线程;它们运行代码的速度并不比平台线程快。它们存在的目的是为了提供规模(更高的吞吐量),而不是速度(更低的延迟)。
创建并运行虚拟线程
这Thread和Thread.BuilderAPI 提供了创建平台线程和虚拟线程的方法。 java.util.concurrent.Executors该类还定义了创建ExecutorService为每个任务启动一个新的虚拟线程。
使用 Thread 类和 Thread.Builder 接口创建虚拟线程
Thread.ofVirtual()方法创建一个实例Thread.Builder用于创建虚拟线程。
以下示例创建并启动一个打印消息的虚拟线程。它调用join方法等待虚拟线程终止。(这使您可以在主线程终止之前看到打印的消息。)
Thread thread = Thread.ofVirtual().start(() -> System.out.println("Hello"));
thread.join();
这Thread.Builder接口允许创建具有通用功能的线程 Thread属性,例如线程的名称。 Thread.Builder.OfPlatform子接口创建平台线程,同时Thread.Builder.OfVirtual创建虚拟线程。
以下示例创建一个 MyThread名为Thread.Builder:
Thread.Builder builder = Thread.ofVirtual().name("MyThread");
Runnable task = () -> {
System.out.println("Running thread");
};
Thread t = builder.start(task);
System.out.println("Thread t name: " + t.getName());
t.join();
使用 Executors.newVirtualThreadPerTaskExecutor() 方法创建并运行虚拟线程
执行器让你可以将线程管理和创建与应用程序的其余部分分开。
以下示例创建一个ExecutorService与Executors.newVirtualThreadPerTaskExecutor()方法。每当 ExecutorService.submit(Runnable)调用时,会创建一个新的虚拟线程并启动它来运行任务。此方法返回Future。请注意方法Future.get()等待线程的任务完成。因此,此示例会在虚拟线程的任务完成后打印一条消息。
try (ExecutorService myExecutor = Executors.newVirtualThreadPerTaskExecutor()) {
Future<?> future = myExecutor.submit(() -> System.out.println("Running thread"));
future.get();
System.out.println("Task completed");
// ...
多线程客户端服务器示例
下面的示例由两个类组成。EchoServer是一个服务器程序,它监听一个端口并为每个连接启动一个新的虚拟线程。 EchoClient是一个客户端程序,它连接到服务器并发送在命令行上输入的消息。
EchoClient创建套接字,从而获得与 的连接 EchoServer。它从标准输入流读取用户的输入,然后通过将EchoServer文本写入套接字将该文本转发给 。EchoServer通过套接字将输入回显到EchoClient。EchoClient读取并显示从服务器传回的数据。EchoServer可以通过虚拟线程同时为多个客户端提供服务,每个客户端连接一个线程。
public class EchoServer {
public static void main(String[] args) throws IOException {
if (args.length != 1) {
System.err.println("Usage: java EchoServer <port>");
System.exit(1);
}
int portNumber = Integer.parseInt(args[0]);
try (
ServerSocket serverSocket =
new ServerSocket(Integer.parseInt(args[0]));
) {
while (true) {
Socket clientSocket = serverSocket.accept();
// Accept incoming connections
// Start a service thread
Thread.ofVirtual().start(() -> {
try (
PrintWriter out =
new PrintWriter(clientSocket.getOutputStream(), true);
BufferedReader in = new BufferedReader(
new InputStreamReader(clientSocket.getInputStream()));
) {
String inputLine;
while ((inputLine = in.readLine()) != null) {
System.out.println(inputLine);
out.println(inputLine);
}
} catch (IOException e) {
e.printStackTrace();
}
});
}
} catch (IOException e) {
System.out.println("Exception caught when trying to listen on port "
+ portNumber + " or listening for a connection");
System.out.println(e.getMessage());
}
}
}
public class EchoClient {
public static void main(String[] args) throws IOException {
if (args.length != 2) {
System.err.println(
"Usage: java EchoClient <hostname> <port>");
System.exit(1);
}
String hostName = args[0];
int portNumber = Integer.parseInt(args[1]);
try (
Socket echoSocket = new Socket(hostName, portNumber);
PrintWriter out =
new PrintWriter(echoSocket.getOutputStream(), true);
BufferedReader in =
new BufferedReader(
new InputStreamReader(echoSocket.getInputStream()));
) {
BufferedReader stdIn =
new BufferedReader(
new InputStreamReader(System.in));
String userInput;
while ((userInput = stdIn.readLine()) != null) {
out.println(userInput);
System.out.println("echo: " + in.readLine());
if (userInput.equals("bye")) break;
}
} catch (UnknownHostException e) {
System.err.println("Don't know about host " + hostName);
System.exit(1);
} catch (IOException e) {
System.err.println("Couldn't get I/O for the connection to " +
hostName);
System.exit(1);
}
}
}
调度虚拟线程和固定虚拟线程
操作系统会调度平台线程的运行时间。但是,Java 运行时会调度虚拟线程的运行时间。当 Java 运行时调度虚拟线程时,它会将虚拟线程分配或挂载到平台线程上,然后操作系统会照常调度该平台线程。此平台线程称为载体。运行某些代码后,虚拟线程可以从其载体上卸载。这通常发生在虚拟线程执行阻塞 I/O 操作时。虚拟线程从其载体上卸载后,载体处于空闲状态,这意味着 Java 运行时调度程序可以在其上挂载不同的虚拟线程。
当虚拟线程固定到其载体时,它无法在阻塞操作期间卸载 。虚拟线程在以下情况下被固定:
- 虚拟线程运行块synchronized 或方法 内的代码
- 虚拟线程运行一个native方法或一个外部函数(参见外部函数和内存 API)
固定不会使应用程序不正确,但可能会影响其可扩展性。尝试通过修改synchronized 频繁运行的块或方法以及使用以下方法保护可能较长的 I/O 操作来 避免频繁和长时间的固定java.util.concurrent.locks.ReentrantLock。
虚拟线程:采用指南
虚拟线程是由 Java 运行时而不是操作系统实现的 Java 线程。虚拟线程与传统线程(我们称之为平台线程)的主要区别在于,我们可以在同一个 Java 进程中轻松运行大量甚至数百万个活动虚拟线程。虚拟线程数量众多,因此非常强大:它们可以更高效地运行以“每个请求一个线程”方式编写的服务器应用程序,允许服务器同时处理更多请求,从而提高吞吐量并减少硬件浪费。
因为虚拟线程是java.lang.Thread并遵守指定的相同规则java.lang.Thread自 Java SE 1.0 以来,开发人员无需学习新概念即可使用它们。然而,由于无法生成大量平台线程(这是 Java 多年来唯一可用的线程实现),因此催生出旨在应对高成本的做法。这些做法在应用于虚拟线程时会适得其反,必须摒弃。此外,巨大的成本差异也促使人们开始思考线程,而这些线程最初可能很陌生。
本指南并非旨在全面涵盖虚拟线程的所有重要细节。它只是提供一套入门指南,帮助那些希望开始使用虚拟线程的人充分利用它们。
以“每个请求一个线程”的方式使用阻塞 I/O API 编写简单的同步代码
对于以线程/请求方式编写的服务器,虚拟线程可以显著提高吞吐量(而不是延迟)。在这种方式中,服务器会在整个持续时间内专门使用一个线程来处理每个传入请求。它 至少会专用一个线程,因为在处理单个请求时,您可能希望使用更多线程来同时执行某些任务。
阻塞平台线程的成本很高,因为它会占用线程(一种相对稀缺的资源),而线程并没有执行太多有意义的工作。由于虚拟线程可能很多,因此阻塞它们的成本很低,并且值得鼓励。因此,您应该以简单的同步风格编写代码并使用阻塞 I/O API。
例如,以非阻塞、异步风格编写的以下代码不会从虚拟线程中受益太多。
CompletableFuture.supplyAsync(info::getUrl, pool)
.thenCompose(url -> getBodyAsync(url, HttpResponse.BodyHandlers.ofString()))
.thenApply(info::findImage)
.thenCompose(url -> getBodyAsync(url, HttpResponse.BodyHandlers.ofByteArray()))
.thenApply(info::setImageData)
.thenAccept(this::process)
.exceptionally(t -> { t.printStackTrace(); return null; });
另一方面,以同步风格编写并使用简单阻塞 IO 的以下代码将受益匪浅:
try {
String page = getBody(info.getUrl(), HttpResponse.BodyHandlers.ofString());
String imageUrl = info.findImage(page);
byte[] data = getBody(imageUrl, HttpResponse.BodyHandlers.ofByteArray());
info.setImageData(data);
process(info);
} catch (Exception ex) {
t.printStackTrace();
}
将每个并发任务表示为虚拟线程;永远不要池化虚拟线程
关于虚拟线程,最难内化的是,虽然它们具有与平台线程相同的行为,但它们不应该代表相同的程序概念。
平台线程稀缺,因此是一种宝贵的资源。宝贵的资源需要管理,而管理平台线程的最常见方式是使用线程池。然后你需要回答的一个问题是,池中应该有多少个线程?
但是虚拟线程非常多,因此每个线程不应该代表一些共享的池化资源,而应该代表一项任务。从托管资源线程转变为应用程序域对象。我们应该有多少个虚拟线程的问题变得显而易见,就像我们应该使用多少个字符串在内存中存储一组用户名的问题一样显而易见:虚拟线程的数量始终等于应用程序中的并发任务数量。
将n 个平台线程转换为n 个虚拟线程几乎没有什么好处;相反,需要转换的 是任务。
要将每个应用程序任务表示为一个线程,请不要使用共享线程池执行器,如下例所示:
Future<ResultA> f1 = sharedThreadPoolExecutor.submit(task1);
Future<ResultB> f2 = sharedThreadPoolExecutor.submit(task2);
// ... use futures
相反,使用虚拟线程执行器,如以下示例所示:
try (var executor = Executors.newVirtualThreadPerTaskExecutor()) {
Future<ResultA> f1 = executor.submit(task1);
Future<ResultB> f2 = executor.submit(task2);
// ... use futures
}
代码仍然使用ExecutorService,但从 Executors.newVirtualThreadPerTaskExecutor()不使用线程池。相反,它为每个提交的任务创建一个新的虚拟线程。
此外,ExecutorService它本身是轻量级的,我们可以像创建任何简单对象一样创建一个新的。这允许我们依赖新添加的ExecutorService.close()方法和 try-with-resources 构造。close方法,即在 try 块末尾隐式调用的方法,将自动等待提交给 ExecutorService也就是说,所有由 ExecutorService-终止。
对于扇出场景,这是一种特别有用的模式,您希望同时对不同的服务执行多个传出调用,如以下示例所示:
void handle(Request request, Response response) {
var url1 = ...
var url2 = ...
try (var executor = Executors.newVirtualThreadPerTaskExecutor()) {
var future1 = executor.submit(() -> fetchURL(url1));
var future2 = executor.submit(() -> fetchURL(url2));
response.send(future1.get() + future2.get());
} catch (ExecutionException | InterruptedException e) {
response.fail(e);
}
}
String fetchURL(URL url) throws IOException {
try (var in = url.openStream()) {
return new String(in.readAllBytes(), StandardCharsets.UTF_8);
}
}
您应该为甚至小型、短暂的并发任务创建一个新的虚拟线程,如上所示。
为了在编写扇出模式和其他常见并发模式时获得更多帮助并具有更好的可观察性,请使用结构化并发。
根据经验,如果您的应用程序从未拥有 10,000 个或更多虚拟线程,则不太可能从虚拟线程中获益。要么是它的负载太轻而不需要更好的吞吐量,要么是您没有将足够多的任务表示给虚拟线程。
使用信号量来限制并发
有时需要限制某个操作的并发性。例如,某些外部服务可能无法处理超过 10 个并发请求。由于平台线程是一种宝贵的资源,通常在池中进行管理,因此线程池已经变得无处不在,以至于它们被用于限制并发性,如下例所示:
ExecutorService es = Executors.newFixedThreadPool(10);
...
Result foo() {
try {
var fut = es.submit(() -> callLimitedService());
return f.get();
} catch (...) { ... }
}
此示例确保对有限服务的并发请求最多为 10 个。
但限制并发性只是线程池操作的一个副作用。线程池旨在共享稀缺资源,而虚拟线程并不稀缺,因此永远不应该被池化!
使用虚拟线程时,如果要限制访问某些服务的并发性,则应使用专门为此目的设计的结构:Semaphore类。以下示例演示了此类:
Semaphore sem = new Semaphore(10);
...
Result foo() {
sem.acquire();
try {
return callLimitedService();
} finally {
sem.release();
}
}
碰巧调用的线程foo将被限制,即被阻止,以便一次只有十个线程可以取得进展,而其他线程将不受阻碍地继续其业务。
简单地使用信号量阻止某些虚拟线程似乎与将任务提交到固定线程池有很大不同,但事实并非如此。将任务提交到线程池会将它们排队以供稍后执行,但信号量在内部(或就此而言的任何其他阻止同步构造)会创建一个被阻止的线程队列,该队列镜像等待池线程执行它们的任务队列。因为虚拟线程是任务,所以生成的结构是等效的:
尽管您可以将平台线程池视为处理从队列中拉出的任务的工作者,将虚拟线程视为任务本身(在继续执行之前处于阻塞状态),但计算机中的底层表示实际上是相同的。认识到排队任务和阻塞线程之间的等价性将有助于您充分利用虚拟线程。
数据库连接池本身充当信号量。限制为 10 个连接的连接池将阻止尝试获取连接的第十一个线程。无需在连接池上添加额外的信号量。
不要在线程局部变量中缓存昂贵的可重用对象
虚拟线程与平台线程一样支持线程局部变量。有关详细信息,请参阅线程局部变量。通常,线程局部变量用于将某些上下文特定信息与当前正在运行的代码关联起来,例如当前事务和用户 ID。对于虚拟线程,这种线程局部变量的使用非常合理。但是,请考虑使用更安全、更高效的作用域值。 有关详细信息, 请参阅作用域值。
线程局部变量还有另一种用法,它与虚拟线程从根本上是矛盾的:缓存可重用对象。这些对象通常创建起来很昂贵(并且会消耗大量内存)、可变且不是线程安全的。它们被缓存在线程局部变量中,以减少实例化的次数和内存中的实例数,但它们会被在不同时间在线程上运行的多个任务重用。
例如,SimpleDateFormat创建成本高昂,并且不是线程安全的。一种出现的模式是将此类实例缓存在ThreadLocal 如下例所示:
static final ThreadLocal<SimpleDateFormat> cachedFormatter =
ThreadLocal.withInitial(SimpleDateFormat::new);
void foo() {
...
cachedFormatter.get().format(...);
...
}
这种缓存仅在线程(因此在线程本地缓存的昂贵对象)由多个任务共享和重用时才有用,平台线程池化时就是这种情况。许多任务foo在线程池中运行时可能会调用,但由于池仅包含几个线程,因此对象只会被实例化几次(每个池线程一次)并缓存和重用。
但是虚拟线程不会被池化,也不会被无关的任务重用。因为每个任务都有自己的虚拟线程,所以来自foo不同任务的每次调用都会触发新线程的实例化 SimpleDateFormat。此外,由于可能有大量虚拟线程同时运行,因此昂贵的对象可能会消耗大量内存。这些结果与线程本地缓存的目的完全相反。
没有单一的通用替代方案,但对于 SimpleDateFormat,你应该用DateTimeFormatter。 DateTimeFormatter是不可变的,因此所有线程可以共享一个实例:
static final DateTimeFormatter formatter = DateTimeFormatter….;
void foo() {
...
formatter.format(...);
...
}
请注意,使用线程局部变量来缓存共享的昂贵对象有时是由异步框架在幕后完成的,因为它们隐含地假设这些对象由极少数池线程使用。这就是为什么混合虚拟线程和异步框架不是一个好主意的原因之一:对方法的调用可能会导致在线程局部变量中实例化原本要缓存和共享的昂贵对象。
避免冗长且频繁的固定
目前虚拟线程实现的一个限制是,在synchronized块或方法内部执行阻塞操作会导致 JDK 的虚拟线程调度程序阻塞宝贵的 OS 线程,而如果阻塞操作是在synchronized块或方法之外执行的,则不会发生这种情况。我们将这种情况称为“固定”。如果阻塞操作既长期存在又频繁发生,则固定可能会对服务器的吞吐量产生不利影响。保护短期操作(例如内存操作)或不频繁的块 synchronized或方法操作应该不会产生不利影响。
为了检测可能有害的固定实例,JDK Flight Recorder(JFR)jdk.VirtualThreadPinned在阻塞操作被固定时发出线程;默认情况下,当操作时间超过 20 毫秒时启用此事件。
或者,您可以使用系统属性 jdk.tracePinnedThreads在线程被固定时阻塞时发出堆栈跟踪。使用该选项运行时, -Djdk.tracePinnedThreads=full 线程被固定时阻塞时会打印完整的堆栈跟踪,突出显示本机帧和占用监视器的帧。使用该选项运行时 -Djdk.tracePinnedThreads=short 会将输出限制为仅显示有问题的帧。
如果这些机制检测到固定时间长且频繁的地方,则synchronized用ReentrantLock在那些特定的地方(同样,没有必要更换synchronized 它保护短暂或不频繁的操作)。以下是长期和频繁使用的示例syncrhonized堵塞。
synchronized(lockObj) {
frequentIO();
}
您可以用以下内容替换它:
lock.lock();
try {
frequentIO();
} finally {
lock.unlock();
}