Java多线程及Future用法
同步和异步 – 比要发射10枚导弹,同步的方式就是上一枚导弹炸毁后才发射下一枚,而异步就是全部挨个发射出去,而不在乎它们是否击中目标,这种异步方式也被称为Fire and Forget。Kafka为了提高吞吐性能默认是异步发送消息的。为了更好的了解Kafka的Producer发送,我们先补充一些关于Java多线程的知识。
Java使用Thread类代表线程,所有的线程对象都必须是Thread类或其子类的实例。
- 继承Thread创建线程(不推荐)
- 实现Runnable接口创建线程
- 实现Callable接口实现线程
- 使用线程池Executor创建线程(推荐)
1.继承Thread实现线程
我们先来看一下Thread的源码,它是一个类,同样也实现了Runnable接口
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 |
public Thread implements Runnable { /* Make sure registerNatives is the first thing <clinit> does. */ private static native void registerNatives(); static { registerNatives(); } private volatile String name; private int priority; private Thread threadQ; private long eetop; /* Whether or not to single_step this thread. */ private boolean single_step; /* Whether or not the thread is a daemon thread. */ private boolean daemon = false; /* JVM state */ private boolean stillborn = false; /* What will be run. */ private Runnable target; /* The group of this thread */ private ThreadGroup group; /* The context ClassLoader for this thread */ private ClassLoader contextClassLoader; /* The inherited AccessControlContext of this thread */ private AccessControlContext inheritedAccessControlContext; /* For autonumbering anonymous threads. */ private static int threadInitNumber; private static synchronized int nextThreadNum() { return threadInitNumber++; } /* ThreadLocal values pertaining to this thread. This map is maintained * by the ThreadLocal class. */ ThreadLocal.ThreadLocalMap threadLocals = null; /* * InheritableThreadLocal values pertaining to this thread. This map is * maintained by the InheritableThreadLocal class. */ ThreadLocal.ThreadLocalMap inheritableThreadLocals = null; /* * The requested stack size for this thread, or 0 if the creator did * not specify a stack size. It is up to the VM to do whatever it * likes with this number; some VMs will ignore it. */ private long stackSize; /* * JVM-private state that persists after native thread termination. */ private long nativeParkEventPointer; /* * Thread ID */ private long tid; /* For generating thread ID */ private static long threadSeqNumber; /* Java thread status for tools, * initialized to indicate thread 'not yet started' */ private volatile int threadStatus = 0; //...... } |
通过继承Thread类来创建并启动多线程的一般步骤如下
- 定义Thread类的子类,并重写该类的run()方法,该方法的方法体就是线程需要完成的任务,run()方法也称为线程执行体。
- 创建Thread子类的实例,也就是创建了线程对象
- 启动线程,即调用线程的start()方法
代码示例:
1 2 3 4 5 6 7 8 9 10 11 12 13 |
public class ThreadTest { public static void main(String[] args) { new MyThread().start(); } static class MyThread extends Thread {//继承Thread public void run() { System.out.println("我是继承Thread类!! "); } } } |
2.实现Runnable接口创建线程
我们来看一下Runnable的源码,它是一个接口:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 |
@FunctionalInterface public interface Runnable { /** * When an object implementing interface <code>Runnable</code> is used * to create a thread, starting the thread causes the object's * <code>run</code> method to be called in that separately executing * thread. * <p> * The general contract of the method <code>run</code> is that it may * take any action whatsoever. * * @see java.lang.Thread#run() */ public abstract void run(); } |
由于run()方法返回值为void类型,所以在执行完任务之后无法返回任何结果。
通过实现Runnable接口创建并启动线程一般步骤如下:
- 定义Runnable接口的实现类,一样要重写run()方法,这个run()方法和Thread中的run()方法一样是线程的执行体
- 创建Runnable实现类的实例,并用这个实例作为Thread的target来创建Thread对象,这个Thread对象才是真正的线程对象
- 第三部依然是通过调用线程对象的start()方法来启动线程
代码示例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 |
public class RunnableTest { public static void main(String[] args) { MyThread2 myThread=new MyThread2(); Thread thread = new Thread(myThread); thread.start(); } static class MyThread2 implements Runnable { @Override public void run() { System.out.println("我是实现Runnable接口!! "); } } } |
3.实现callable接口实现线程
我们来看一下callable源码,它是一个接口:
1 2 3 4 5 6 7 8 9 10 |
@FunctionalInterface public interface Callable<V> { /** * Computes a result, or throws an exception if unable to do so. * * @return computed result * @throws Exception if unable to compute a result */ V call() throws Exception; } |
它和Runnable接口不一样的是,call()方法提供了2个额外功能:
- call()方法可以有返回值
- call()方法可以声明抛出异常
java5提供了Future接口来代表Callable接口里call()方法的返回值,并且为Future接口提供了一个实现类FutureTask,这个实现类既实现了Future接口,还实现了Runnable接口,因此可以作为Thread类的target。在Future接口里定义了几个公共方法来控制它关联的Callable任务。
那么怎么使用Callable呢?一般情况下是配合ExecutorService来使用的,在ExecutorService接口中声明了若干个submit方法的重载版本:
1 2 3 |
<T> Future<T> submit(Callable<T> task); <T> Future<T> submit(Runnable task, T result); Future<?> submit(Runnable task); |
第一个submit方法里面的参数类型就是Callable。
暂时只需要知道Callable一般是和ExecutorService配合来使用的,具体的使用方法讲在后面讲述。
一般情况下我们使用第一个submit方法和第三个submit方法,第二个submit方法很少使用。
3.1 Future
我们来看一下Future的源码,它是一个接口,用来返回子线程的计算结果:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 |
public interface Future<V> { /** * Attempts to cancel execution of this task. This attempt will * fail if the task has already completed, has already been cancelled, * or could not be cancelled for some other reason. If successful, * and this task has not started when {@code cancel} is called, * this task should never run. If the task has already started, * then the {@code mayInterruptIfRunning} parameter determines * whether the thread executing this task should be interrupted in * an attempt to stop the task. * * <p>After this method returns, subsequent calls to {@link #isDone} will * always return {@code true}. Subsequent calls to {@link #isCancelled} * will always return {@code true} if this method returned {@code true}. * * @param mayInterruptIfRunning {@code true} if the thread executing this * task should be interrupted; otherwise, in-progress tasks are allowed * to complete * @return {@code false} if the task could not be cancelled, * typically because it has already completed normally; * {@code true} otherwise */ boolean cancel(boolean mayInterruptIfRunning); /** * Returns {@code true} if this task was cancelled before it completed * normally. * * @return {@code true} if this task was cancelled before it completed */ boolean isCancelled(); /** * Returns {@code true} if this task completed. * * Completion may be due to normal termination, an exception, or * cancellation -- in all of these cases, this method will return * {@code true}. * * @return {@code true} if this task completed */ boolean isDone(); /** * Waits if necessary for the computation to complete, and then * retrieves its result. * * @return the computed result * @throws CancellationException if the computation was cancelled * @throws ExecutionException if the computation threw an * exception * @throws InterruptedException if the current thread was interrupted * while waiting */ V get() throws InterruptedException, ExecutionException; /** * Waits if necessary for at most the given time for the computation * to complete, and then retrieves its result, if available. * * @param timeout the maximum time to wait * @param unit the time unit of the timeout argument * @return the computed result * @throws CancellationException if the computation was cancelled * @throws ExecutionException if the computation threw an * exception * @throws InterruptedException if the current thread was interrupted * while waiting * @throws TimeoutException if the wait timed out */ V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException; } |
我们来看一下它的各个方法:
boolean cancel(boolean mayInterruptIfRunning)
:用来取消任务,如果取消任务成功则返回true,如果取消任务失败则返回false。参数mayInterruptIfRunning表示是否允许取消正在执行却没有执行完毕的任务,如果设置true,则表示可以取消正在执行过程中的任务。如果任务已经完成,则无论mayInterruptIfRunning为true还是false,此方法肯定返回false,即如果取消已经完成的任务会返回false;如果任务正在执行,若mayInterruptIfRunning设置为true,则返回true,若mayInterruptIfRunning设置为false,则返回false;如果任务还没有执行,则无论mayInterruptIfRunning为true还是false,肯定返回true。boolean isCancelled()
:如果在Callable任务正常完成前被取消,返回Trueboolean isDone()
:若Callable任务完成,返回TrueV get() throws InterruptedException, ExecutionException
:返回Callable里call()方法的返回值,调用这个方法会导致程序阻塞,必须等到子线程结束后才会得到返回值V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException
:用来获取执行结果,如果在指定时间内,还没获取到结果,就直接返回null
因为Future只是一个接口,所以是无法直接用来创建对象使用的,因此就有了下面的FutureTask
3.2 FutureTask
我们先来看一下FutureTask的实现:
1 |
public class FutureTask<V> implements RunnableFuture<V> {} |
FutureTask类实现了RunnableFuture接口,我们看一下RunnableFuture接口的实现:
1 2 3 4 5 6 7 |
public interface RunnableFuture<V> extends Runnable, Future<V> { /** * Sets this Future to the result of its computation * unless it has been cancelled. */ void run(); } |
可以看出RunnableFuture继承了Runnable接口和Future接口,而FutureTask实现了RunnableFuture接口。所以它既可以作为Runnable被线程执行,又可以作为Future得到Callable的返回值。
FutureTask提供了2个构造器:
1 2 3 4 5 6 7 8 9 10 |
public FutureTask(Callable<V> callable) { if (callable == null) throw new NullPointerException(); this.callable = callable; this.state = NEW; // ensure visibility of callable } public FutureTask(Runnable runnable, V result) { this.callable = Executors.callable(runnable, result); this.state = NEW; // ensure visibility of callable } |
事实上,FutureTask是Future接口的一个唯一实现类。
3.3 使用FutureTask对象作为Thread对象的target创建并启动线程
接下来我们看如何创建并启动有返回值的线程:
- 创建Callable接口的实现类,并实现call()方法,然后创建该实现类的实例(从java8开始可以直接使用Lambda表达式创建Callable对象)。
- 使用FutureTask类来包装Callable对象,该FutureTask对象封装了Callable对象的call()方法的返回值
- 使用FutureTask对象作为Thread对象的target创建并启动线程(因为FutureTask实现了Runnable接口)
- 调用FutureTask对象的get()方法来获得子线程执行结束后的返回值
代码示例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 |
public class CallableAndFuture { public static void main(String[] args) { Callable<Integer> call = new Callable<Integer>() { public Integer call() throws Exception { System.out.println("计算线程正在计算结果..."); Thread.sleep(3000); return 1; } }; FutureTask<Integer> future = new FutureTask<>(call); new Thread(future,"有返回值的线程").start();//实质上还是以Callable对象来创建并启动线程 try { System.out.println("子线程的返回值:" + future.get());//get()方法会阻塞,直到子线程执行结束才返回 } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } } } |
3.4 使用executor创建线程
3.4.1.使用Callable+Future获取执行结果
代码示例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 |
public class CallableAndFuture { public static void main(String[] args) { /** Executors提供了一系列工厂方法用于创先线程池,返回的线程池都实现了ExecutorService接口。 */ ExecutorService executor = Executors.newCachedThreadPool(); Task task = new Task(); Future<Integer> result = executor.submit(task); executor.shutdown(); try { Thread.sleep(1000); } catch (InterruptedException e1) { e1.printStackTrace(); } System.out.println("主线程在执行任务"); try { System.out.println("task运行结果"+result.get()); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } System.out.println("所有任务执行完毕"); } static class Task implements Callable<Integer> { @Override public Integer call() throws Exception { System.out.println("子线程在进行计算"); Thread.sleep(3000); int sum = 0; for(int i=0;i<100;i++) sum += i; return sum; } } } |
3.4.2.使用Callable+FutureTask获取执行结果
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 |
public class CallableAndFuture2 { public static void main(String[] args) { //第一种方式 ExecutorService executor = Executors.newCachedThreadPool(); Task task = new Task(); FutureTask<Integer> futureTask = new FutureTask<Integer>(task); executor.submit(futureTask); executor.shutdown(); //第二种方式,注意这种方式和第一种方式效果是类似的,只不过一个使用的是ExecutorService,一个使用的是Thread /*Task task = new Task(); FutureTask<Integer> futureTask = new FutureTask<Integer>(task); Thread thread = new Thread(futureTask); thread.start();*/ try { Thread.sleep(1000); } catch (InterruptedException e1) { e1.printStackTrace(); } System.out.println("主线程在执行任务"); try { System.out.println("task运行结果"+futureTask.get()); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } System.out.println("所有任务执行完毕"); } static class Task implements Callable<Integer> { @Override public Integer call() throws Exception { System.out.println("子线程在进行计算"); Thread.sleep(3000); int sum = 0; for(int i=0;i<100;i++) sum += i; return sum; } } } |
4.使用线程池Executor创建线程
4.1 Executor执行Runnable
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 |
public class ExecutorRunnable { /** * 从结果中可以看出,pool-1-thread-1和pool-1-thread-2均被调用了两次,这是随机的,execute会首先在线程池中选择 * 一个已有空闲线程来执行任务,如果线程池中没有空闲线程,它便会创建一个新的线程来执行任务。 */ public static void main(String[] args){ ExecutorService executorService = Executors.newCachedThreadPool(); // ExecutorService executorService = Executors.newFixedThreadPool(5); // ExecutorService executorService = Executors.newSingleThreadExecutor(); for (int i = 0; i < 5; i++){ executorService.execute(new TestRunnable()); System.out.println("************* a" + i + " *************"); } executorService.shutdown(); } } class TestRunnable implements Runnable { public void run() { System.out.println(Thread.currentThread().getName() + "线程被调用了。"); } } |
执行结果:
1 2 3 4 5 6 7 8 |
************* a1 ************* ************* a2 ************* pool-1-thread-2线程被调用了。 ************* a3 ************* pool-1-thread-1线程被调用了。 pool-1-thread-2线程被调用了。 ************* a4 ************* pool-1-thread-3线程被调用了。 |
4.2Executor执行Callable
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 |
public class Executor执行Callable { /** * 从结果中可以同样可以看出,submit也是首先选择空闲线程来执行任务,如果没有,才会创建新的线程来执行任务。 * 另外,需要注意:如果Future的返回尚未完成,则get()方法会阻塞等待,直到Future完成返回,可以通过 * 调用isDone()方法判断Future是否完成了返回。 */ public static void main(String[] args){ ExecutorService executorService = Executors.newCachedThreadPool(); List<Future<String>> resultList = new ArrayList<Future<String>>(); //创建10个任务并执行 for (int i = 0; i < 10; i++){ //使用ExecutorService执行Callable类型的任务,并将结果保存在future变量中 Future<String> future = executorService.submit(new TaskWithResult(i)); //将任务执行结果存储到List中 resultList.add(future); } //遍历任务的结果 for (Future<String> fs : resultList){ try{ while(!fs.isDone());//Future返回如果没有完成,则一直循环等待,直到Future返回完成 System.out.println(fs.get()); //打印各个线程(任务)执行的结果 }catch(InterruptedException e){ e.printStackTrace(); }catch(ExecutionException e){ e.printStackTrace(); }finally{ //启动一次顺序关闭,执行以前提交的任务,但不接受新任务 executorService.shutdown(); } } } } class TaskWithResult implements Callable<String> { private int id; public TaskWithResult(int id){ this.id = id; } /** * 任务的具体过程,一旦任务传给ExecutorService的submit方法, * 则该方法自动在一个线程上执行 */ public String call() throws Exception { System.out.println("call()方法被自动调用!!! " + Thread.currentThread().getName()); //该返回结果将被Future的get方法得到 return "call()方法被自动调用,任务返回的结果是:" + id + " " + Thread.currentThread().getName(); } } |
执行结果:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 |
call()方法被自动调用!!! pool-1-thread-1 call()方法被自动调用,任务返回的结果是:0 pool-1-thread-1 call()方法被自动调用!!! pool-1-thread-2 call()方法被自动调用,任务返回的结果是:1 pool-1-thread-2 call()方法被自动调用!!! pool-1-thread-3 call()方法被自动调用,任务返回的结果是:2 pool-1-thread-3 call()方法被自动调用!!! pool-1-thread-5 call()方法被自动调用!!! pool-1-thread-6 call()方法被自动调用!!! pool-1-thread-7 call()方法被自动调用!!! pool-1-thread-9 call()方法被自动调用!!! pool-1-thread-4 call()方法被自动调用,任务返回的结果是:3 pool-1-thread-4 call()方法被自动调用,任务返回的结果是:4 pool-1-thread-5 call()方法被自动调用,任务返回的结果是:5 pool-1-thread-6 call()方法被自动调用,任务返回的结果是:6 pool-1-thread-7 call()方法被自动调用!!! pool-1-thread-8 call()方法被自动调用,任务返回的结果是:7 pool-1-thread-8 call()方法被自动调用,任务返回的结果是:8 pool-1-thread-9 call()方法被自动调用!!! pool-1-thread-10 call()方法被自动调用,任务返回的结果是:9 pool-1-thread-10 |
Views: 28