您的当前位置:首页正文

CompletableFuture异步编排

2024-11-10 来源:个人技术集锦

一、线程基本了解

1、创建线程的四种方式

1.1、继承Thread类

public static void main(String[] args) {
	Thread1 thread1 = new Thread1();
	thread1.start();
}

// 继承Thread类
public static class Thread1 extends Thread{
	@Override
	public void run() {
		System.out.println("当前线程 = " + Thread.currentThread().getName());
	}
}

1.2、实现Runnable接口

public static void main(String[] args) {
	Runnable1 runnable1 = new Runnable1();
	new Thread(runnable1).start();
	
}

public static class Runnable1 implements Runnable{
	@Override
	public void run() {
		System.out.println("当前线程 = " + Thread.currentThread().getName());
	}
}

1.3、实现Callable接口 + FutureTask (可以拿到返回结果,可以处理异常)

public static void main(String[] args) throws ExecutionException, InterruptedException {
	FutureTask<String> futureTask = new FutureTask<>(new Callable1());
	// 线程启动
	new Thread(futureTask).start();
	
	// 堵塞等待整个线程执行完成,获取返回结果
	String name = futureTask.get();
	System.out.println("name = " + name);

}

public static class Callable1 implements Callable<String>{

	@Override
	public String call() throws Exception {
		String name = Thread.currentThread().getName();
		System.out.println("当前线程 = " + name);
		return name;
	}
}

1.4、使用线程池

// 整个系统最好只要一两个池
public static ExecutorService executorService = Executors.newFixedThreadPool(10);

// 直接把任务提交给线程池,让它执行。
public static void main(String[] args) {
//	executorService.submit() // 有返回值
	executorService.execute(new Runnable() { // 无返回值
		@Override
		public void run() {
			System.out.println("当前线程 = " + Thread.currentThread());
		}
	});
}

2、线程池

ThreadPoolExecutor executor = new ThreadPoolExecutor(5,200,10,TimeUnit.SECONDS,new LinkedBlockingDeque<>(10000),Executors.defaultThreadFactory(),new ThreadPoolExecutor.AbortPolicy());

2.1、线程池七大参数

  • corePoolSize:[5] 核心线程数(一直存在,除非allowCoreThreadTimeOut);线程池创建好就准备了5个new Thread。

  • maximumPoolSize:[200]最大线程数量;控制资源用,不管多高的并发,也只有200个正在运行。

  • keepAliveTime:存活时间。只要线程空闲大于指定的keepAliveTime,释放空闲的线程(除核心线程外)。

  • unit:时间单位,给上面存活时间用。

  • workQueue:堵塞队列。如果任务有很多,就会将目前多的任务放在队列里面。只要线程空闲,就会去队列里取出新的任务继续执行。

  • threadFactory:线程创建工厂。

  • RejectedExecutionHandler handler: 如果队列满了,按照我们指定的拒绝策略拒绝任务。

    四种拒绝策略:


2.2、运行流程

1、线程池创建,准备好core数量的核心线程,准备接收任务。

2、core核心线程满了,就将再进来的任务放入堵塞队列中。空闲的core就会自己去堵塞队列获取任务执行。

3、堵塞队列满了,就直接开启新线程执行,最大只能开到max指定的数量。

4、max满了,就有RejectedExecutionHandler拒绝策略拒接任务。

5、max都执行完成,有很多空闲,在指定的时间keepAliveTime以后,释放除核心线程外的线程。

2.3、常见的4种线程池特点

该部分参考:

Executors.newFixedThreadPool:
1、线程数量固定

2、只有核心线程切并且不会被回收

3、当所有线程都处于活动状态时,新任务都会处于等待状态,直到有线程空闲出来

Executors.newCachedThreadPool:
1、线程数量不定的线程池

2、只有非核心线程,最大线程数量为Integer.MAX_VALUE,可视为任意大

3、有超时机制,时长为60s,即超过60s的空闲线程就会被回收

4、当线程池中的线程都处于活动状态时,线程池会创建新的线程来处理新任务,否则就会利用空闲的线程来处理新任务。因此任何任务都会被立即执行

5、该线程池比较适合执行大量耗时较少的任务

Executors.newScheduledThreadPool
1、核心线程数量是固定的,而非核心线程数不固定的,并且非核心线程有超时机制,只要处于闲置状态就会被立即回收

2、该线程池主要用于执行定时任务和具有固定周期的重复任务

Executors.newSingleThreadPool

只有一个核心线程,它确保所有的任务都在同一个线程中按顺序执行。因此在这些任务之间不需要处理线程同步的问题

二、CompletableFuture异步编排

CompletableFuture是java.util.concurrent库在java 8中新增的主要工具,字面翻译过来,就是“可完成的Future”。同传统的Future相比较,CompletableFuture能够主动设置计算的结果值(主动终结计算过程,即completable),从而在某些场景下主动结束阻塞等待。而Future由于不能主动设置计算结果值,一旦调用get()进行阻塞等待,要么当计算结果产生,要么超时,才会返回。

1、创建CompletableFuture

1.1、runAsync(无返回值)

CompletableFuture.runAsync(()->{
	System.out.println("当前线程 = " + Thread.currentThread().getName());
});

指定线程池

public static ExecutorService executorService = Executors.newFixedThreadPool(10);

public static void main(String[] args) {
	CompletableFuture.runAsync(()->{
		System.out.println("当前线程 = " + Thread.currentThread().getName());
	},executorService);
}

1.2、supplyAsync(有返回值)

CompletableFuture<String> future = CompletableFuture.supplyAsync(()->{
	String name = Thread.currentThread().getName();
	System.out.println("当前线程 = " + name);
	return name;
});
// 获取返回值
future.get();

指定线程池

public static ExecutorService executorService = Executors.newFixedThreadPool(10);

public static void main(String[] args) {
	CompletableFuture<String> future = CompletableFuture.supplyAsync(()->{
		String name = Thread.currentThread().getName();
		System.out.println("当前线程 = " + name);
		return name;
	},executorService);
// 获取返回值
future.get();
}

2、各个方法使用

1、whenComplete

翻译为:当任务完成时
whenComplete主要用于注入任务完成时的回调通知逻辑

// 创建不写,就是上方创建的代码

/**
 * res 异步返回结果
 * excption 异常信息
 */
// 当任务完成时的逻辑 
future.whenComplete((res,excption)->{
	System.out.println("异步任务成功完成了,结果是 = " + res +" 异常是:"+excption);
});

exceptionally处理异常时默认返回

// 当任务完成时的逻辑
future.whenComplete((res,excption)->{
	System.out.println("异步任务成功完成了,结果是 = " + res +" 异常是:"+excption);
}).exceptionally((throwable -> { // 异常处理
	return "当出现异常时,默认返回这个";
})); 

2、handle

handle与whenComplete的作用有些类似,但是handle可以处理返回结果。

/**
 * res 异步返回结果
 * excption 异常信息
 */
future.handle((res,excption)->{
	if (res!=null){ // 正常,直接返结果。
		return res;
	}
	if (excption!=null){
		return "如果异常,返这个";
	}
	return "两个都不走返这个";
});

3、线程串行化 thenApply / thenAccept / thenRun

如现在有A、B两个方法。

thenApply:当一个线程依赖另一个线程时,获取上一个任务返回的结果,并返回当前任务的返回值。(拿A的返回值,处理后,返回处理后的值)

future.thenApply((res)->{
	return "这个任务完成后,执行这个方法,可以接收任务返回值,还可以把这个方法执行后的返回值返回去";
});
// 用get方法获取返回值
future.get();

thenAccept:消费处理结果。接收任务的处理结果,并消费处理,无返回结果。(拿A的返回值,B还要再处理,用这个方法)

future.thenAccept((res)->{
	System.out.println("这个任务完成后,执行这个方法,可以接收任务返回值,但是这个方法执行完没有返回值");
});

thenRun:只要上面的任务执行完成,就开始执行thenRun,只是处理完成任务后,执行thenRun的后续结果。(不接收A返回值,直接执行B,用这个)

future.thenRun(()->{
	System.out.println("这个任务完成后,执行这个方法,不能拿到这个任务的返回值");
});

4、任务组合 thenCombine / thenAcceptBoth / runAfterBoth

thenCombine:组合两个future,获取两个future的返回结果,并返回当前任务的返回值。

/**
 * future1 任务1
 * future2 任务2
 */
CompletableFuture<String> future = future1.thenCombine(future2, (f1, f2) -> {
	// 获取两个的结果后处理逻辑,无返回值
	System.out.println("任务1、2都结束后,获取两个的结果,任务1结果:" + f1 + "任务2结果" + f2);
	return "返回这个结果";
});
future.get(); // 用get获取结果

thenAcceptBoth:组合两个future,获取两个future任务的返回结果,然后处理任务,没有返回值。

future1.thenAcceptBoth(future2,(f1,f2)->{
	// 获取两个的结果后处理逻辑,无返回值
	System.out.println("任务1、2都结束后,获取两个的结果,任务1结果:"+ f1 +"任务2结果" + f2); 
});

runAfterBoth:组合两个future,不需要获取future的结果,只需两个future处理完成任务后,处理该任务。

/**
 * future1 任务1
 * future2 任务2
 */
future1.runAfterBoth(future2,()->{
	System.out.println("任务1、2都结束后,来处理这个任务");
},executorService);//指定线程池。

5、组合任务(一个完成)applyToEither / acceptEither / runAfterEither

applyToEither:两个任务有一个执行完成,获取它的返回值,处理任务并有新的返回值。

CompletableFuture<String> future3 = future1.applyToEither(future2, (res) -> {
	return "任务1、2有一个做完,就来执行这个,可以获取到返回结果,返回新的结果";
});
future3.get(); // 用get方法获取返回值

acceptEither:两个任务有一个执行完成,获取他的返回值,处理任务,没有新的返回值。

future1.acceptEither(future2,(res)->{
	System.out.println("任务1、2有一个做完,就来执行这个,可以获取到返回结果,无新的结果返回");
});

runAfterEither:两个任务有一个执行完成,不需要获取futrue结果,处理任务,没有返回值。

future1.runAfterEither(future2,()->{
	System.out.println("任务1、2有一个做完,就来执行这个,无返回值");
});

6、多任务组合 allOf / anyOf

allOf:等待所有任务完成

CompletableFuture<Void> allOf = CompletableFuture.allOf(future1, future2, future3);
allOf.get(); // 等待所有结果完成
System.out.println("所有结果都完成,才会打印我");

anyOf:只要有一个任务完成

CompletableFuture<Object> anyOf = CompletableFuture.anyOf(future1, future2, future3);
anyOf.get(); // 拿的是完成的那个的返回值。
System.out.println("任意一个完成,都会打印我");
Top