Java多线程
我们先看一个简单的例子:
public class ThreadDemo1
{
public static void main(String args[]) throws Exception
{
new TestThread1().start();
while(true)
{
System.out.println("main thread is running");
Thread.sleep(1000);
}
}
}
class TestThread1 extends Thread
{
public void run()
{
while(true)
{
System.out.println(" TestThread1 is running");
try {
Thread.sleep(1000); //1000毫秒
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
}
这个例子里面,有一个主线程、一个子线程TestThread1。
TestThread1
定义了一个子线程,它继承了 Thread
类,并重新实现了 run
方法,因此可以作为一个子线程。
在 main
函数中,调用了子线程的 start
方法,就能启动这个子线程了。
Thread.sleep(time)
可以让线程休眠一段时间。
它们会无规律地交替输出。
Java多线程的实现方式
Java中创建多线程有两种比较经典的方式:
- 继承Thread类
- 实现Runnable接口
它们都必须实现run方法,举例如下:
public class Thread1 extends Thread{
public void run()
{
System.out.println("hello");
}
}
public class Thread2 implements Runnable{
public void run()
{
System.out.println("hello");
}
}
- 想要启动线程,需要调用
start
方法!就会自动新建线程然后调用run
方法。这个底层是JNI来实现的。 - 如果直接调用run方法,那就直接串行执行,并不会新建线程。
- 同一个线程,如果多次调用
start
,那么会直接报错,只能启动一次! - 多个线程启动后,执行的顺序是随机的。
- 线程无需关闭,run方法结束后会自动关闭。
main
主线程可能早于子线程结束,但是程序并不会因为主线程的结束而终止。- 整个程序结束的标志:所有线程都终止!
下面通过一些程序实例来看看这几条规则。
首先是如何创建新线程,必须通过 start
方法才能启动。
public class Thread1 extends Thread{
public void run()
{
System.out.println("hello");
}
public static void main(String[] a)
{
new Thread1().start();
}
}
public class Thread2 implements Runnable{
public void run()
{
System.out.println("hello");
}
public void main(String[] a){
new Thread( new Thread2() ).start();
}
}
这里值得注意的是,如果继承了Thread类,那么直接可以调用start
,如果实现的是接口,那么还需要新建一个 Thread
类,把这个接口作为参数传递给Thread类才可以,也要调用start方法。
看完了启动,下面去看看那些细节规则,体会一下。
public class ThreadDemo0
{
public static void main(String args[]) throws Exception
{
new TestThread0().run(); //通过run去执行,会发生什么?
while(true)
{
System.out.println("main thread is running");
Thread.sleep(10);
}
}
}
class TestThread0
{
public void run()
{
while(true)
{
System.out.println(" TestThread1 is running");
try {
Thread.sleep(1000); //休眠1000毫秒
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
}
这个程序通过 run
去启动子线程,所以串行执行。又因为子线程是个死循环,不可能出来,所以只会一直输出 TestThread1 is running。
如果把 run
改成 start
,那么就会出现主线程和子线程交替执行的情况。
下面我们验证一下主线程结束不会导致整个程序结束。
public class ThreadDemo2
{
public static void main(String args[]) throws InterruptedException
{
new TestThread2().start(); //主线程执行完这个就结束了
}
}
class TestThread2 extends Thread
{
public void run()
{
while(true)
{
System.out.println("TestThread2" +
" is running");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
}
程序会一直输出TestThread2 is running,虽然主线程早就结束了,但是因为还有没结束的子线程,所以整个程序还是会继续的。
public class ThreadDemo3
{
public static void main(String args[])
{
//new TestThread3().start();
//Runnable对象必须放在一个Thread类中才能运行
TestThread3 tt= new TestThread3();//创建TestThread类的一个实例
Thread t= new Thread(tt);//创建一个Thread类的实例
Thread t1 = new Thread(tt);
t.start();//使线程进入Runnable状态
t1.start();
while(true)
{
System.out.println( Thread.currentThread().getName() + " is running");
try {
Thread.sleep(1000); //1000毫秒
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
}
class TestThread3 implements Runnable //extends Thread
{
//线程的代码段,当执行start()时,线程从此出开始执行
public void run()
{
while(true)
{
System.out.println(Thread.currentThread().getName() +
" is running");
try {
Thread.sleep(1000); //1000毫秒
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
}
这个程序的输出结果将会是:
类似于这样的随机输出。一共有三个线程,main,0,1,它们就会随机输出。所以说接口可以用于创建线程,但是必须还是得回归到Thread类。
如果试图多次start同一个线程对象,会抛出 IllegalThreadStateException
异常。
最后,我们对比一下这两种方法:
- 继承Thread类,占据了父类的名额,就不能再继承别的了,不如Runnable方便。
- Thread类本身就实现了Runnable接口
- Runnable启动的时候必须放进Thread类里边才能启动
- Runnable更容易实现变量资源共享。Thread类中需要通过static才能实现多线程的变量共享,但Runnable接口中普通变量即可。
结论:Runnable更好一些
Java多线程变量共享
上述讲的方法虽然成功创建了变量和线程,但是没有进行过任何线程之间的通信。在Java中,一般通过共享变量来达到信息共享。
达到多线程中的共享变量,有两种方法:
static
静态变量- 实现了Runnable接口的类中的成员变量,类似于下图的模式:
下面通过例子分别展示:
public class ThreadDemo0
{
public static void main(String [] args)
{
new TestThread0().start();
new TestThread0().start();
new TestThread0().start();
new TestThread0().start();
}
}
class TestThread0 extends Thread
{
// private int tickets=100;
private static int tickets=100;
public void run()
{
while(true)
{
if(tickets>0)
{
System.out.println(Thread.currentThread().getName() +
" is selling ticket " + tickets);
tickets = tickets - 1;
}
else
{
break;
}
}
}
}
卖票。每个线程都是先判断是否还有余票,有就卖,没有就结束线程。首先使用static,发现变量确实共享了,但是实际卖出去的比100张多一些。
随后不使用static,发现每个线程都各自卖100张票,总共卖了400张。
我们改成Runnable接口再试试:
class Ticket implements Runnable{
private int tickets = 100;
@Override
public void run() {
while(true){
if (tickets > 0){
System.out.println(Thread.currentThread().getName() + " is selling ticket. Tickets left: " + tickets);
tickets--;
}
else{
break;
}
}
}
}
public class ThreadTicket {
public static void main(String[] args) {
Ticket t = new Ticket();
new Thread(t).start();
new Thread(t).start();
new Thread(t).start();
new Thread(t).start();
}
}
我们发现,直接作为成员变量就可以实现共享。我们只是new了一个Ticket对象,因此内存里只有一份。而Thread的话,每个成员变量都得重新实现一次,所以必须用static。
但是我们执行发现,卖的票数不是正好100张,可能会多出一些,出现了数据不一致的情况!
数据不一致的原因是什么?因为每个线程都有自己独立的缓存!执行的时候是先把数据读入缓存,再修改,改完放回缓存,最后缓存再写入主存,好几个步骤。如果在这个过程中,两个线程同时取了缓存、同时更新了数据、同时写回主存,那么两次修改只有最后一次提交的那个是可见的。这样很可能就遗漏掉了其中一个线程的操作。
因此,需要采取措施解决不一致问题。
不一致问题可以分为两类,第一是工作缓存副本问题;第二是关键步骤加锁问题。
工作副本的可见性问题
volatile
关键字能够解决工作缓存副本问题,保证不同线程对共享变量操作时的可见性。下面是一个例子:
public class ThreadDemo2
{
public static void main(String args[]) throws Exception
{
TestThread2 t = new TestThread2();
t.start();
Thread.sleep(2000);
t.flag = false;
System.out.println("main thread is exiting");
}
}
class TestThread2 extends Thread
{
//boolean flag = true;
volatile boolean flag = true;
public void run()
{
int i=0;
while(flag)
{
i++;
}
System.out.println("test thread3 is exiting");
}
}
这个程序开启了线程2,并且当flag为true的时候一直做自增操作。然后试图把flag设置为false,让它不要再去做了。因此应该输出线程3结束的信息。
如果flag不使用volatile,那么会观察到输出main thread is exiting,但是程序一直不退出。说明即便改了那个flag,但是子线程用的还是缓存里的flag,没看到主存的变化。
如果使用了volatile来修饰flag,那么会观察到很快两个都结束了,成功退出!说明子线程立刻看到了flag的变化,立刻退出了。
总结: volatile
关键字能让所有线程立刻看到值的变化!
关键步骤加锁限制
- 互斥:某个线程运行某一代码段(关键区),其他线程不能同时访问这个代码段!
- 同步:多个线程的运行,必须按照某种规定的先后顺序来进行
- 互斥是同步的一种特例
通过 synchronized
可以实现互斥,达到关键步骤加锁限制。它可以修饰代码块或者函数,只能允许一个线程执行这个代码块。
那么,看起来卖票问题有救了!我们稍作修改,如下:
class Ticket implements Runnable{
private volatile int tickets = 100;
String lock = "";
@Override
public void run() {
while(true){
synchronized (lock) {
if (tickets > 0) {
System.out.println(Thread.currentThread().getName() + " is selling ticket. Tickets left: " + tickets);
tickets--;
} else {
break;
}
}
}
}
}
public class ThreadTicket {
public static void main(String[] args) {
Ticket t = new Ticket();
new Thread(t).start();
new Thread(t).start();
new Thread(t).start();
new Thread(t).start();
}
}
为什么要弄个新的空字符串?因为加锁是加在某个对象上的,如果加锁了,别人就要去抢这个对象对应的锁,然后才能继续执行,否则就得等待这把锁被释放掉。
另外, synchronized
还可以去修饰函数,如下:
class Ticket implements Runnable {
private volatile int tickets = 100;
private synchronized void sale(){
if(tickets>0) {
System.out.println(Thread.currentThread().getName() + " is selling ticket. Tickets left: " + tickets);
tickets--;
}
}
@Override
public void run() {
while (true) {
sale();
if (tickets <=0) {
break;
}
}
}
}
public class ThreadTicket {
public static void main(String[] args) {
Ticket t = new Ticket();
new Thread(t).start();
new Thread(t).start();
new Thread(t).start();
new Thread(t).start();
}
}
如果我们修饰了函数,那么可以不在代码块里加锁了。
注意,判断tickets是否大于0这件事本身也需要加锁!
虽然 synchronized
很好用,但是注意这个会造成严重的性能负担,所以慎重使用!
Java线程管理(1)
之前进行了线程的启动、信息共享,但是现在线程之间还不能直接交流。线程与线程之间存在等待、通知/唤醒、终止等类型的操作。
为了明确这个问题,首先要明确一下Java语言中线程的状态:
- NEW 新建
- RUNNABLE 就绪
- RUNNING 运行中
- BLOCKED 阻塞
- TERMINATED 终止
线程阻塞和唤醒的相关方法:
- sleep 休眠:进入BLOCKED状态,时间一到,自己醒来
- wait/notify/notifyAll:等待,直到别人notify才唤醒
- join:等待另一个线程结束
- interrupt:向另外一个线程发送中断信号,让它抛出InterruptedException并进行后续处理
下面通过一个非常经典的生产者-消费者例子来讲解这几个方法。
在这个问题中,有一个仓库,生产者源源不断地往里面放生产好的产品;消费者源源不断地取走生产出来的产品。仓库满的时候生产者不能再放;仓库空的时候消费者不能再取。这里就涉及到了线程阻塞。
代码就放在下面了,能够完整自己熟练写出来,就OK了!
class Product {
int id;
String name;
Product(int i, String n) {
this.id = i;
this.name = n;
}
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String toString() {
return "(ID: " + id + " Name: " + name + ")";
}
}
class Storage {
static int size = 10;
Product[] products = new Product[size];
volatile int top = 0;
public synchronized void push(Product p) throws InterruptedException {
// 先检查是否还有空间
while (top == size) {
// 满了,得阻塞一下
System.out.println("Producer Wait");
wait();
}
// 如果没满,加进来
products[top] = p;
top++;
System.out.println(Thread.currentThread().getName() + " Add product to storage ID: " + p.getId() + " Name: " + p.getName());
System.out.println("Producer NotifyAll");
notifyAll();
}
public synchronized Product pop() throws InterruptedException {
// 先判断是否为空
while (top == 0) {
System.out.println("Consumer Wait");
wait();
}
// 没空
top--;
Product p = products[top];
System.out.println(Thread.currentThread().getName() + " Pop from storage ID: " + p.getId() + " Name: " + p.getName());
System.out.println("Consumer NotifyAll");
notifyAll();
return p;
}
}
class Producer implements Runnable {
Storage storage;
Producer(Storage s) {
this.storage = s;
}
@Override
public void run() {
int i = 0;
for (i = 0; i < 10; i++) {
Product p = new Product(i, "ygnn" + i);
try {
storage.push(p);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
class Consumer implements Runnable {
Storage storage;
Consumer(Storage s){
this.storage = s;
}
@Override
public void run() {
for(int i=0;i<10;i++){
Product p = null;
try {
p = storage.pop();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
public class FactoryNew {
public static void main(String[] args) throws InterruptedException {
Storage storage = new Storage();
Thread p1 = new Thread(new Producer(storage));
Thread p2 = new Thread(new Producer(storage));
Thread c1 = new Thread(new Consumer(storage));
Thread c2 = new Thread(new Consumer(storage));
p1.setName("Producer 1");
p2.setName("Producer 2");
c1.setName("Consumer 1");
c2.setName("Consumer 2");
p1.start();
p2.start();
Thread.sleep(1000);
c1.start();
c2.start();
}
}
通过这个例子,能够很好地学习到notifyAll以及wait的用法,同时对于关键操作需要使用synchronized。
Java线程管理(2)
之前的wait和notify把线程的“命运”交给了别人手上,有没有自己决定“命运”的方法呢?
可以主动去监测共享变量的变化,如果需要作出相应行动,可以先把锁释放掉,再进行后续操作。
如果需要暂停, Thread.sleep()
,如果需要终止,结束run方法即可。
下面举一个例子,看看让自己主动结束是否会更“优雅”:
public class InterruptThread {
public static void main(String[] args) throws InterruptedException {
Thread1 t1 = new Thread1();
Thread2 t2 = new Thread2();
Thread.sleep(1000);
t1.start();
t2.start();
t1.interrupt();
t2.flag = true;
}
}
class Thread1 extends Thread{
@Override
public void run(){
while(!interrupted()){
System.out.println( Thread.currentThread().getName()+ " is running");
try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println( Thread.currentThread().getName()+ " is exiting");
}
}
class Thread2 extends Thread{
public volatile boolean flag = false;
@Override
public void run(){
while(!flag){
System.out.println( Thread.currentThread().getName()+ " is running");
try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println( Thread.currentThread().getName()+ " is exiting");
}
}
可以看到,如果利用变量的形式,就不会抛异常,能够主动地退出。如果抛异常,不一定能够完全释放所有被上锁的资源,因此监视一个变量可能会是更优雅的方式。
下面介绍线程的死锁。每个线程互相持有别人需要的锁,就出现了线程死锁。想要预防死锁,就需要对资源进行等级排序。
public class Deadlock {
public static Object s1 = new Object();
public static Object s2 = new Object();
public static void main(String[] args) throws InterruptedException {
Thread1 thread1 = new Thread1();
thread1.start();
Thread2 thread2 = new Thread2();
thread2.start();
}
}
class Thread1 extends Thread{
@Override
public void run(){
synchronized (Deadlock.s1){
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
synchronized (Deadlock.s2){
System.out.println(Thread.currentThread().getName() + " running!");
}
}
}
}
class Thread2 extends Thread{
@Override
public void run(){
synchronized (Deadlock.s2){
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
synchronized (Deadlock.s1){
System.out.println(Thread.currentThread().getName() + " running!");
}
}
}
}
上面就是一个特别典型的死锁的例子。注意那个资源一定不要用字符串,涉及到常量池,或许上锁会不成功!使用VisualVM工具查看,发现检测到了死锁:
可以查看报告看如何出现的死锁:
如何解决死锁呢?只要让Thread2也是先占用s1再占用s2就可以了,也就是规定资源的获取顺序。只要破坏了线程死锁的四个条件:互斥条件、请求与保持条件、不可抢占条件和循环等待,就可以解除死锁。
本节的最后一个知识点就是守护线程。普通线程的结束条件是run方法执行完毕,但守护线程要么run执行完毕结束,要么main方法执行完毕它也结束。不会出现main结束了守护线程还在的情况。
下面是一个守护线程的案例:
public class DaemonTest {
public static void main(String[] args) throws InterruptedException {
Daemon daemon = new Daemon();
daemon.setDaemon(true);
daemon.start();
Thread.sleep(3000);
System.out.println("Main thread exit!");
}
}
class Daemon extends Thread{
@Override
public void run(){
while(true){
System.out.println(Thread.currentThread().getName() + " is running.");
try {
Thread.sleep(300);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
使用 setDaemon
后可以把线程设置为守护线程。如果main的所有方法都执行完了,那么守护线程将会自动停止。如图:
如果我们删掉那一行setDaemon,那么主线程退出后还会一直执行子线程。
不设置为守护线程的情况下:
守护线程永远不要去访问资源或者加锁,例如文件或者数据库等。因为它会随着main的结束而结束,可能释放不掉资源!
Executor
并行比较困难,因为任务分配和执行高度耦合。可以使用一些Java提供的多线程框架进行更好的管理。
并行计算一般有两种模式:主从模式(Master-Slave)、Worker模式(Worker-Worker, P2P)
Java的并行一般有三种模式:
- Thread/Runnable/Thread组管理
- Executor
- Fork-Join框架
线程组管理
ThreadGroup,是一个线程的集合,树形结构,大线程组可以包括小线程组,可以通过enumerate来遍历,可以有效管理多个线程,但是效率比较低,任务分配和执行过程依然高度耦合,无法重用线程,只能new出来,不许多次start,代价比较高。
这个提供了可以遍历(enumerate)和管理多个线程的方式,但是缺点也很明显,不常用,了解即可。
Executor
在 java.util.concurrent
包中,能够分离任务和执行者的创建,且能够实现线程复用(直接new线程代价很大)。
共享线程池:预设好的多个Thread,能够弹性增加;可以多次执行很小的任务;任务创建和执行过程解耦;程序员无需关心线程池执行过程。
Executor的几个主要类:ExecutorService、ThreadPoolExecutor、Future
- Executors.newCachedThreadPool/newFixedThreadPool:创建线程池
- ExecutorService:线程池服务
- Callable接口:返回具体的逻辑对象(线程类),和Runnable等价,但是call方法可以有返回值
- Future:返回结果
下面是一个基础的展示如何使用线程池的例子。首先去定义一个任务,这个任务很简单,就是生成一个随机数,打印一句话,然后休眠。
class Task implements Runnable {
private String name;
public Task(String name){
this.name=name;
}
public void run() {
try {
Long duration=(long)(Math.random()*1000);
System.out.printf("%s: Task %s: Doing a task during %d seconds\n",Thread.currentThread().getName(),name,duration);
Thread.sleep(duration);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.printf("%s: Task %s: Finished on: %s\n",Thread.currentThread().getName(),name,new Date());
}
}
这个任务只需要写任务相关的代码,无需考虑如何notify或者wait,省去了手动管理线程协作。
然后还需要一个线程池服务,如下:
public class Server {
//线程池
private ThreadPoolExecutor executor;
public Server(){
executor=(ThreadPoolExecutor)Executors.newCachedThreadPool(); // 可变大小的线程池
//executor=(ThreadPoolExecutor)Executors.newFixedThreadPool(5); // 固定大小的线程池
}
//向线程池提交任务
public void submitTask(Task task){
System.out.printf("Server: A new task has arrived\n");
executor.execute(task); //执行 无返回值
System.out.printf("Server: Pool Size: %d\n",executor.getPoolSize());
System.out.printf("Server: Active Count: %d\n",executor.getActiveCount());
System.out.printf("Server: Completed Tasks: %d\n",executor.getCompletedTaskCount());
}
public void endServer() {
executor.shutdown();
}
}
可以看出,我们只需要提交任务,不需要启动新的线程,线程池中的线程就会一直等待任务的到来然后执行。最后看看main函数:
public class Main {
public static void main(String[] args) throws InterruptedException {
// 创建一个执行服务器
Server server=new Server();
// 创建100个任务,并发给执行器,等待完成
for (int i=0; i<100; i++){
Task task=new Task("Task "+i);
Thread.sleep(10);
server.submitTask(task);
}
server.endServer();
}
}
执行结果中就能看出,这100个任务被分配了很多线程去执行。在我的测试结果里,分配了44个线程完成这100个任务。当然,线程数太多对于计算机的性能也是有挑战的。
下面来手撕一道并行计算的题目!我想并行计算1-1000的总和,怎么做?
首先分析一下,我们可以把它分配到几个线程里,这个数量不宜过大,我们可以手动设置,比如4个线程。线程数是计算机CPU核心数的2倍或者4倍比较合适。同时这个是需要返回值的,所以得使用Callable接口了。我们把任务分解,给出每个任务的起始值和结束值即可。下面试图实现一下:
public class SumTask implements Callable<Integer> {
private int start;
private int end;
public SumTask(int s, int e){
this.start = s;
this.end = e;
}
@Override
public Integer call() throws Exception {
int result = 0;
for (int i = start; i <= end; i++){
result += i;
}
Thread.sleep(new Random().nextInt(1000));
System.out.println(Thread.currentThread().getName() + " finishes computing, result is: " + result);
return result;
}
}
public class SumMain {
public static void main(String[] args) throws InterruptedException, ExecutionException {
// 定义线程池
ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(5);
// 结果列表
ArrayList<Future<Integer>> resultList = new ArrayList<Future<Integer>>();
// 拆分并提交任务
for (int i = 0; i < 10; i++){
SumTask task = new SumTask(i*100+1, i*100+100);
Future<Integer> taskSum = executor.submit(task);
resultList.add(taskSum);
}
// 判断任务是否全都执行完毕
while(executor.getCompletedTaskCount()<resultList.size()){
// 查询各个线程的状态
System.out.println("Finished task:" + executor.getCompletedTaskCount());
// 逐个输出结果
for (int i =0; i< resultList.size(); i++){
System.out.println("Task " + i + " now finished? " + resultList.get(i).isDone());
}
Thread.sleep(500);
}
// 跳出循环,完成计算
// 逐个输出结果并计算总和
int sum = 0;
for (int i =0; i< resultList.size(); i++){
System.out.println("Task " + i + " computes " + resultList.get(i).get());
sum += resultList.get(i).get();
}
System.out.println("The final result is: " + sum);
executor.shutdown();
}
}
得到的结果如下(节选):
-
得到的结果
Finished task:0
Task 0 now finished? false
Task 1 now finished? false
Task 2 now finished? false
Task 3 now finished? false
Task 4 now finished? false
Task 5 now finished? false
Task 6 now finished? false
Task 7 now finished? false
Task 8 now finished? false
Task 9 now finished? false
pool-1-thread-1 finishes computing, result is: 5050
pool-1-thread-5 finishes computing, result is: 45050
pool-1-thread-1 finishes computing, result is: 55050
Finished task:3
Task 0 now finished? true
Task 1 now finished? false
Task 2 now finished? false
Task 3 now finished? false
Task 4 now finished? true
Task 5 now finished? true
Task 6 now finished? false
Task 7 now finished? false
Task 8 now finished? false
Task 9 now finished? false
pool-1-thread-5 finishes computing, result is: 65050
pool-1-thread-3 finishes computing, result is: 25050
pool-1-thread-2 finishes computing, result is: 15050
pool-1-thread-4 finishes computing, result is: 35050
Finished task:7
Task 0 now finished? true
Task 1 now finished? true
Task 2 now finished? true
Task 3 now finished? true
Task 4 now finished? true
Task 5 now finished? true
Task 6 now finished? true
Task 7 now finished? false
Task 8 now finished? false
Task 9 now finished? false
pool-1-thread-5 finishes computing, result is: 85050
pool-1-thread-1 finishes computing, result is: 75050
Finished task:9
Task 0 now finished? true
Task 1 now finished? true
Task 2 now finished? true
Task 3 now finished? true
Task 4 now finished? true
Task 5 now finished? true
Task 6 now finished? true
Task 7 now finished? true
Task 8 now finished? true
Task 9 now finished? false
pool-1-thread-3 finishes computing, result is: 95050
Task 0 computes 5050
Task 1 computes 15050
Task 2 computes 25050
Task 3 computes 35050
Task 4 computes 45050
Task 5 computes 55050
Task 6 computes 65050
Task 7 computes 75050
Task 8 computes 85050
Task 9 computes 95050
The final result is: 500500Process finished with exit code 0
可以看到,每个线程都去做了自己该做的事。另外,程序结束的时候必须shutdown掉线程池,否则线程一直是开着的!
Fork-Join框架
Fork-Join: 分解-治理-合并
比如一个长度特别长的数组,我们可以进行拆分,拆成一个一个一个的小数组,长度较小的时候就可以了,最后让这些小数组进行并行计算,把结果加起来即可。
关键类:
- ForkJoinPool 任务池
- RecursiveAction
- RecursiveTask
下面是一个具体例子:
public class SumForkJoin {
public static void main(String[] args) throws ExecutionException, InterruptedException {
ForkJoinPool forkJoinPool = new ForkJoinPool();
SumRecursive sumRecursive = new SumRecursive(1L,1000000000L);
ForkJoinTask<Long> forkJoinTask = forkJoinPool.submit(sumRecursive);
while(!forkJoinTask.isDone()){
System.out.printf("Main: Thread Count: %d\n",forkJoinPool.getActiveThreadCount());
System.out.printf("Main: Paralelism: %d\n",forkJoinPool.getParallelism());
try {
Thread.sleep(50);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println(forkJoinTask.get().toString());
}
}
class SumRecursive extends RecursiveTask<Long> {
private Long start;
private Long end;
private static int threshold = 5;
public SumRecursive(Long s, Long e){
this.start = s;
this.end = e;
}
@Override
protected Long compute() {
Long sum = 0L;
if ( end - start > 5 ){
// 递归计算
Long middle = (start+end)/2L;
SumRecursive left = new SumRecursive(start, middle);
SumRecursive right = new SumRecursive(middle+1, end);
invokeAll(left, right);
Long l = left.join();
Long r = right.join();
sum = l + r;
}
else{
// 直接加和
for(Long i = start; i<=end; i++){
sum+=i;
}
}
return sum;
}
}
这就是一个非常典型的二分法实现ForkJoinPool的例子!
-
运行结果
Main: Thread Count: 1
Main: Paralelism: 10
Main: Thread Count: 10
Main: Paralelism: 10
Main: Thread Count: 10
Main: Paralelism: 10
Main: Thread Count: 10
Main: Paralelism: 10
Main: Thread Count: 10
Main: Paralelism: 10
Main: Thread Count: 9
Main: Paralelism: 10
5000000050000000
几个点:第一, RecursiveTask
需要继承而不是实现,这个与Callable不同。第二,递归的时候,在里面递归成子任务类,然后需要进行 invokeAll
,然后分别调用 join
即可。join的作用是等待线程完成后才能继续,否则阻塞。第三,在Main中还是调用 isDone
,看看是否完成了。
理解这个框架即可。
并发数据结构
常用的ArrayList,HashSet,HashMap是不安全的,即多线程访问会造成错误或者抛异常。
传统的Vector、Hashtable的性能过差。
后来提出了并发数据结构,用于数据添加和删除。
- 阻塞式集合:当集合为空或者满时,等待。
- 非阻塞式集合:当集合为空或者满时,不等待,返回null或者抛异常。
大概分为四类去介绍:List、Set、HashMap、Queue/Deque
List
通过如下案例进行对比。首先定义了个普通的ArrayList,这个性能不错但是线程不安全。之后分别定义了synchronizedList(一个基于synchronized的列表)和CopyOnWriteArrayList(一个基于复制机制的非阻塞列表)。
定义了一个线程类,就是这个线程每次把当前线程名加入列表10次。然后分别建立了30个线程,分别使用三种数据结构各10次。看看结果。
public class ListTest {
public static void main(String[] args) throws InterruptedException{
//线程不安全
List<String> unsafeList = new ArrayList<String>();
//线程安全
List<String> safeList1 = Collections.synchronizedList(new ArrayList<String>());
//线程安全
CopyOnWriteArrayList<String> safeList2 = new CopyOnWriteArrayList<String>();
ListThread t1 = new ListThread(unsafeList);
ListThread t2 = new ListThread(safeList1);
ListThread t3 = new ListThread(safeList2);
for(int i = 0; i < 10; i++){
Thread t = new Thread(t1, String.valueOf(i));
t.start();
}
for(int i = 0; i < 10; i++) {
Thread t = new Thread(t2, String.valueOf(i));
t.start();
}
for(int i = 0; i < 10; i++) {
Thread t = new Thread(t3, String.valueOf(i));
t.start();
}
//等待子线程执行完
Thread.sleep(2000);
System.out.println("listThread1.list.size() = " + t1.list.size());
System.out.println("listThread2.list.size() = " + t2.list.size());
System.out.println("listThread3.list.size() = " + t3.list.size());
}
}
-
执行结果
listThread1.list.size() = 88
listThread2.list.size() = 100
listThread3.list.size() = 100
可以看出,第一个发生了同步问题,不安全,但后面的都是安全的。
Set
相似地,我们实现一个类似的程序:
分别使用了线程不安全的HashSet、synchronizedSet(一个基于synchronized的集合)和CopyOnWriteArraySet(一个基于复制机制的非阻塞集合)。
定义了一个线程类,就是这个线程每次把当前线程名加入列表10次。然后分别建立了30个线程,分别使用三种数据结构各10次。看看结果。
public class SetTest{
public static void main(String[] args) throws InterruptedException{
//线程不安全
Set<String> unsafeSet = new HashSet<String>();
//线程安全
Set<String> safeSet1 = Collections.synchronizedSet(new HashSet<String>());
//线程安全
CopyOnWriteArraySet<String> safeSet2 = new CopyOnWriteArraySet<String>();
SetThread t1 = new SetThread(unsafeSet);
SetThread t2 = new SetThread(safeSet1);
SetThread t3 = new SetThread(safeSet2);
//unsafeSet的运行测试
for(int i = 0; i < 10; i++){
Thread t = new Thread(t1, String.valueOf(i));
t.start();
}
for(int i = 0; i < 10; i++) {
Thread t = new Thread(t2, String.valueOf(i));
t.start();
}
for(int i = 0; i < 10; i++) {
Thread t = new Thread(t3, String.valueOf(i));
t.start();
}
//等待子线程执行完
Thread.sleep(2000);
System.out.println("setThread1.set.size() = " + t1.set.size());
System.out.println("setThread2.set.size() = " + t2.set.size());
System.out.println("setThread3.set.size() = " + t3.set.size());
}
}
-
执行结果
setThread1.set.size() = 98
setThread2.set.size() = 100
setThread3.set.size() = 100
可以发现,第一个确实线程不安全,后面两个都是线程安全的。
Map
Map也是类似的。HashMap是不安全的。
synchronizedMap基于synchronized关键字,效率较差。ConcurrentHashMap是一个适用于读多写少场景的非阻塞Map。
Queue和Deque
ConcurrentLinkedQueue是非阻塞队列。
ArrayBlockingQueue和LinkedBlockingQueue都是阻塞队列。
如上都是线程安全的。
Java并发协作控制
Lock
- 能够实现更细粒度的临界区结构。
- tryLock方法能够判断临界区是否空闲。
- 允许分离读写的方法,比如多个读(共享)、一个写(排他)。
- 性能更好。
主要的类: ReentrantLock
:可重入的互斥锁
ReentrantReadWriteLock
:可重入的读写锁
主要有 lock
和 unlock
函数
一个简单的例子:奶茶店,学生排队买奶茶,如果前面还在排队,就等会儿再重新看看还排不排,不排队了就去要一杯奶茶。老板用传统订单本写入订单,几个员工可以同时读取这个订单。暂时不考虑学生创建订单和老板添加订单间的一致性问题,分别考虑学生排队和老板员工这里分别需要上什么锁。
学生排队应该上一个ReentrantLock
,因为如果没人排队,我就去排队,同时这个队伍就被我占领了,别人不许去。
而老板员工需要ReentrantReadWriteLock
,且老板需要写锁,我写的时候别人不许看;员工需要读锁,可以大家一起读这个订单本。
代码放在下面了:
public class LockExample {
private static final ReentrantLock queueLock = new ReentrantLock(); //可重入锁
private static final ReentrantReadWriteLock orderLock = new ReentrantReadWriteLock(); //可重入读写锁
public static void main(String[] args) throws InterruptedException {
buyMilkTea();
handleOrder(); //需手动关闭
}
public void tryToBuyMilkTea() throws InterruptedException {
boolean flag = true;
while(flag)
{
if (queueLock.tryLock()) {
//tryLock这句话可以尝试。如果能获取到,就顺便加锁;如果获取不到,就返回false
long thinkingTime = (long) (Math.random() * 500);
Thread.sleep(thinkingTime);
System.out.println(Thread.currentThread().getName() + ": 来一杯奶茶");
flag = false;
queueLock.unlock(); //得主动释放锁
} else {
System.out.println(Thread.currentThread().getName() + ": 再等等");
}
if(flag)
{
Thread.sleep(1000);
}
}
}
public void addOrder() throws InterruptedException {
orderLock.writeLock().lock(); //老板上写锁
long writingTime = (long) (Math.random() * 1000);
Thread.sleep(writingTime);
System.out.println("老板新加一笔订单");
orderLock.writeLock().unlock(); //老板释放写锁
}
public void viewOrder() throws InterruptedException {
orderLock.readLock().lock(); //员工上读锁
long readingTime = (long) (Math.random() * 500);
Thread.sleep(readingTime);
System.out.println(Thread.currentThread().getName() + ": 查看订单本");
orderLock.readLock().unlock(); //员工释放读锁
}
public static void buyMilkTea() throws InterruptedException {
LockExample lockExample = new LockExample();
int STUDENTS_CNT = 10;
Thread[] students = new Thread[STUDENTS_CNT];
for (int i = 0; i < STUDENTS_CNT; i++) {
students[i] = new Thread(new Runnable() {
@Override
public void run() {
try {
long walkingTime = (long) (Math.random() * 1000);
Thread.sleep(walkingTime);
lockExample.tryToBuyMilkTea();
} catch(InterruptedException e) {
System.out.println(e.getMessage());
}
}
}
);
students[i].start();
}
for (int i = 0; i < STUDENTS_CNT; i++)
students[i].join();
}
public static void handleOrder() throws InterruptedException {
LockExample lockExample = new LockExample();
Thread boss = new Thread(new Runnable() {
@Override
public void run() {
while (true) {
try {
lockExample.addOrder();
long waitingTime = (long) (Math.random() * 1000);
Thread.sleep(waitingTime);
} catch (InterruptedException e) {
System.out.println(e.getMessage());
}
}
}
});
boss.start();
int workerCnt = 3;
Thread[] workers = new Thread[workerCnt];
for (int i = 0; i < workerCnt; i++)
{
workers[i] = new Thread(new Runnable() {
@Override
public void run() {
while (true) {
try {
lockExample.viewOrder();
long workingTime = (long) (Math.random() * 5000);
Thread.sleep(workingTime);
} catch (InterruptedException e) {
System.out.println(e.getMessage());
}
}
}
});
workers[i].start();
}
}
}
Semaphore
信号量:本质上是一个计数器,大于0的时候可以使用;否则不能。
可以设置并发量。例如限制10个同时访问。
主要两个方法: acquire
获取信号量; release
释放信号量。
它比Lock更进一步,能同时控制多个关键区。
下面通过一个地下车库停车的例子看一下。车库里有5个车位,一共有10辆车想要进来。每次车进入车库前需要申请一个信号量。过一会儿,车会走。
public class SemaphoreExample {
private final Semaphore placeSemaphore = new Semaphore(5);
public boolean parking() throws InterruptedException {
if (placeSemaphore.tryAcquire()) { //尝试申请信号量
System.out.println(Thread.currentThread().getName() + ": Parking!");
return true;
} else { //如果申请失败
System.out.println(Thread.currentThread().getName() + ": No place!");
return false;
}
}
public void leaving() throws InterruptedException {
placeSemaphore.release(); //需要主动释放信号量
System.out.println(Thread.currentThread().getName() + ": Went away!");
}
public static void main(String[] args) throws InterruptedException {
int tryToParkCnt = 10;
SemaphoreExample semaphoreExample = new SemaphoreExample();
Thread[] parkers = new Thread[tryToParkCnt];
for (int i = 0; i < tryToParkCnt; i++) {
parkers[i] = new Thread(new Runnable() {
@Override
public void run() {
try {
long randomTime = (long) (Math.random() * 1000);
Thread.sleep(randomTime);
if (semaphoreExample.parking()) {
long parkingTime = (long) (Math.random() * 1200);
Thread.sleep(parkingTime);
semaphoreExample.leaving();
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
parkers[i].start();
}
for (int i = 0; i < tryToParkCnt; i++) {
parkers[i].join();
}
}
}
-
执行结果
Thread-1: Parking!
Thread-8: Parking!
Thread-7: Parking!
Thread-6: Parking!
Thread-0: Parking!
Thread-1: Went away!
Thread-8: Went away!
Thread-5: Parking!
Thread-2: Parking!
Thread-0: Went away!
Thread-4: Parking!
Thread-6: Went away!
Thread-3: Parking!
Thread-9: No place!
Thread-7: Went away!
Thread-4: Went away!
Thread-2: Went away!
Thread-3: Went away!
Thread-5: Went away!
Latch
Latch等待锁,是一个同步辅助类。用来同步执行任务的一个或多个线程。不是用来保护临界区或者共享资源。比如,等这些线程都执行完,再继续往下,就可以用Latch。
主要类是 CountDownLatch
,包括 countDown
和 await
两个方法。其中countDown可以让计数-1,await会等到计数变成0,再往下执行。
举一个例子,比赛中,所有选手都到达终点才能结束比赛:
public class CountDownLatchExample {
public static void main(String[] args) throws InterruptedException {
int runnerCnt = 10;
CountDownLatch startSignal = new CountDownLatch(1);
CountDownLatch doneSignal = new CountDownLatch(runnerCnt);
for (int i = 0; i < runnerCnt; ++i) // create and start threads
new Thread(new Worker(startSignal, doneSignal)).start();
System.out.println("Preparing...");
System.out.println("Prepared");
startSignal.countDown(); // 发令枪启动!
System.out.println("Game starts!");
doneSignal.await(); // 等待所有选手完成比赛,才可以。
System.out.println("Game ends!");
}
static class Worker implements Runnable {
private final CountDownLatch startSignal;
private final CountDownLatch doneSignal;
Worker(CountDownLatch startSignal, CountDownLatch doneSignal) {
this.startSignal = startSignal;
this.doneSignal = doneSignal;
}
public void run() {
try {
startSignal.await(); // 等待发令枪信号。如果latch变成0了,会立刻通知所有await!
doWork(); // 发令枪后才能跑
doneSignal.countDown(); // 跑完标记为到达终点
} catch (InterruptedException ex) {
} // return;
}
void doWork() {
System.out.println(Thread.currentThread().getName() + ": reached the goal!");
}
}
}
-
执行结果
Preparing...
Prepared
Game starts!
Thread-2: reached the goal!
Thread-3: reached the goal!
Thread-9: reached the goal!
Thread-4: reached the goal!
Thread-0: reached the goal!
Thread-1: reached the goal!
Thread-7: reached the goal!
Thread-6: reached the goal!
Thread-5: reached the goal!
Thread-8: reached the goal!
Game ends!
Barrier
Barrier集合点,也是一个同步辅助类。允许多个线程在某一个点上进行同步。
常用类是 CyclicBarrier
,构造函数是需要同步的线程数量, await
等待其他线程,达到数量后放行。
具体来说,当在barrier上await的线程数量达到要求之后,就全部解锁,并且执行预定的回调动作(比如某个线程的run方法)。
Barrier适合做这种归并的程序。
举例,一个二维数组(三行),每个线程分别计算一行,等每个线程都算完了,再去算总和。
public class CyclicBarrierExample {
public static void main(String[] args) {
final int[][] numbers = new int[3][5];
final int[] results = new int[3];
int[] row1 = new int[]{1, 2, 3, 4, 5};
int[] row2 = new int[]{6, 7, 8, 9, 10};
int[] row3 = new int[]{11, 12, 13, 14, 15};
numbers[0] = row1;
numbers[1] = row2;
numbers[2] = row3;
CalculateFinalResult finalResultCalculator = new CalculateFinalResult(results);
CyclicBarrier barrier = new CyclicBarrier(3, finalResultCalculator);
//当有3个线程在barrier上await,就执行finalResultCalculator
for(int i = 0; i < 3; i++) {
CalculateEachRow rowCalculator = new CalculateEachRow(barrier, numbers, i, results);
new Thread(rowCalculator).start();
}
}
}
class CalculateEachRow implements Runnable {
final int[][] numbers;
final int rowNumber;
final int[] res;
final CyclicBarrier barrier;
CalculateEachRow(CyclicBarrier barrier, int[][] numbers, int rowNumber, int[] res) {
this.barrier = barrier;
this.numbers = numbers;
this.rowNumber = rowNumber;
this.res = res;
}
@Override
public void run() {
int[] row = numbers[rowNumber];
int sum = 0;
for (int data : row) {
sum += data;
res[rowNumber] = sum;
}
try {
System.out.println(Thread.currentThread().getName() + ": calculates line" + (rowNumber + 1) + ", with result " + sum);
barrier.await(); //等待!只要超过3个(Barrier的构造参数),就放行。
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
}
}
class CalculateFinalResult implements Runnable {
final int[] eachRowRes;
int finalRes;
public int getFinalResult() {
return finalRes;
}
CalculateFinalResult(int[] eachRowRes) {
this.eachRowRes = eachRowRes;
}
@Override
public void run() {
int sum = 0;
for(int data : eachRowRes) {
sum += data;
}
finalRes = sum;
System.out.println("Total sum: " + finalRes);
}
}
-
执行结果
Thread-0: calculates line1, with result 15
Thread-2: calculates line3, with result 65
Thread-1: calculates line2, with result 40
Total sum: 120
Phaser
允许执行并发多阶段任务。在每个阶段结束的位置对线程进行同步,当所有线程都到达这一步,再执行下一步。
有 arrive()
和 arriveAndAwaitAdvance()
。
举例,一个考试一共三道题,只有所有学生都做完一道题,才会下发下一道题。
public class PhaserExample {
public static void main(String[] args) {
int studentsCnt = 5;
Phaser phaser = new Phaser(studentsCnt);
for (int i = 0; i < studentsCnt; i++) {
new Thread(new Student(phaser)).start();
}
}
}
class Student implements Runnable {
private final Phaser phaser;
public Student(Phaser phaser) {
this.phaser = phaser;
}
@Override
public void run() {
try {
doTesting(1);
phaser.arriveAndAwaitAdvance(); //等到5个线程都到了,才放行
doTesting(2);
phaser.arriveAndAwaitAdvance();
doTesting(3);
phaser.arriveAndAwaitAdvance();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
private void doTesting(int i) throws InterruptedException {
String name = Thread.currentThread().getName();
System.out.println(name + "开始答第" + i + "题");
long thinkingTime = (long) (Math.random() * 1000);
Thread.sleep(thinkingTime);
System.out.println(name + "第" + i + "道题答题结束");
}
}
Exchanger
允许在并发线程中交换消息。允许在2个线程中定义同步点,当两个线程都到达同步点,它们交换数据结构。
数据的交换是双向的,通过 exchange()
方法。
当两个线程都执行到了同一个Exchanger的exchange方法的时候,就会交换数据。
public class ExchangerExample {
public static void main(String[] args) throws InterruptedException {
Exchanger<String> exchanger = new Exchanger<String>();
BackgroundWorker worker = new BackgroundWorker(exchanger);
new Thread(worker).start();
Scanner scanner = new Scanner(System.in);
while(true) {
System.out.println("输入要查询的属性学生姓名:");
String input = scanner.nextLine().trim();
exchanger.exchange(input); //把用户输入传递给线程
String value = exchanger.exchange(null); //拿到线程反馈结果
if ("exit".equals(value)) {
break;
}
System.out.println("查询结果:" + value);
}
scanner.close();
}
}
class BackgroundWorker implements Runnable {
final Exchanger<String> exchanger;
BackgroundWorker(Exchanger<String> exchanger) {
this.exchanger = exchanger;
}
@Override
public void run() {
while (true) {
try {
String item = exchanger.exchange(null);
switch (item) {
case "zhangsan":
exchanger.exchange("90");
break;
case "lisi":
exchanger.exchange("80");
break;
case "wangwu":
exchanger.exchange("70");
break;
case "exit":
exchanger.exchange("exit");
return;
default:
exchanger.exchange("查无此人");
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}