面试必备考点 Java 并发编程(3)

JUC工具类

1、CountDownLatch:减法计数器

public class CountDownLatchTest {
    public static void main(String[] args) {
        //优先执行,执行完毕之后,才能执行 main
        //1、实例化计数器,100
        CountDownLatch countDownLatch = new CountDownLatch(100);
        new Thread(()->{
            for (int i = 0; i < 90; i++) {
                System.out.println("++++++++++Thread");
                countDownLatch.countDown();
            }
        }).start();

        for (int i = 0; i < 10; i++) {
            countDownLatch.countDown();
        }

        //2、调用 await 方法
        try {
            countDownLatch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        for (int i = 0; i < 100; i++) {
            System.out.println("main--------------");
        }
    }
}

countDown() 计数器减一

await() 计数器停止,唤醒其他线程

new CountDownLatch(100)、coutDown()、await() 必须配合起来使用,创建对象时赋的值是多少,coutDown() 就要执行多少次,否则计数器不会清零,计数器就不会停止后,其他线程无法唤醒。

2、CyclicBarrier:加法计数器

package com.janeroad.demo;

import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;

public class CyclicBarrierTest {
    public static void main(String[] args) {
        //定义一个计数器,当计数器的值累加到10,输出"放行"
        CyclicBarrier cyclicBarrier = new CyclicBarrier(30,()->{
            System.out.println("放行");
        });

        for (int i = 1; i <= 90; i++) {
            final int temp = i;

            new Thread(()->{
                System.out.println("-->"+temp);

                try {
                    cyclicBarrier.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (BrokenBarrierException e) {
                    e.printStackTrace();
                }
            }).start();

        }
    }
}

await() 执行 CyclicBarrier 的业务,CyclicBarrier 中的 Runnable 执行一次之后,计数器清零,等待下一次执行。

3、Semaphore:计数信号量

实际开发中主要用来做限流操作,即限制可以访问某些资源的线程数量。

  • 初始化
  • 获取许可
  • 释放许可
package com.janeroad.demo;

import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;

public class SemaphoreTest {
    public static void main(String[] args) {
        //同时只能进5个人
        Semaphore semaphore = new Semaphore(5);
        for (int i = 0; i < 15; i++) {
            new Thread(()->{
                try {
                    //获得许可
                    semaphore.acquire();
                    System.out.println(Thread.currentThread().getName()+"进店购物");
                    TimeUnit.SECONDS.sleep(5);
                    System.out.println(Thread.currentThread().getName()+"出店");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }finally {
                    //释放许可
                    semaphore.release();
                }
            },String.valueOf(i)).start();
        }
    }
}

acquire():等待获取许可,如果没有名额,则等待。

release():释放许可,并且唤醒等待的线程。

读写锁

接口 ReadWriteLock,实现类 ReentrantReadWriteLock,可以多线程同时读,但是同一时间内只能有一个线程写。

读写锁也是线程同步,只不过粒度更细,分别给读操作和写操作上不同的锁。

package com.janeroad.demo2;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

public class ReadWriteLockTest {
    public static void main(String[] args) {
        Cache cache = new Cache();

        //5个线程写
        for (int i = 0; i < 5; i++) {
            final int temp = i;
            new Thread(()->{
                cache.write(temp,String.valueOf(temp));
            }).start();
        }

        //5个线程读
        for (int i = 0; i < 5; i++) {
            final int temp = i;
            new Thread(()->{
                cache.read(temp);
            }).start();
        }
    }
}

class Cache{
    private Map<Integer,String> map = new HashMap<>();
    ReadWriteLock readWriteLock = new ReentrantReadWriteLock();

    /**
     * 写
     */
    public void write(Integer key,String value){
        //加写入锁
        readWriteLock.writeLock().lock();
        System.out.println(key+"开始写入...");
        map.put(key,value);
        System.out.println(key+"写入完毕...");
        //释放写入锁
        readWriteLock.writeLock().unlock();
    }

    /**
     * 读
     */
    public void read(Integer key){
        //加读取锁
        readWriteLock.readLock().lock();
        System.out.println(key+"开始读取...");
        map.get(key);
        System.out.println(key+"读取完毕...");
        //释放读取锁
        readWriteLock.readLock().unlock();
    }
}

写入锁也叫独占锁,只能被一个线程占用,读取锁也叫共享锁,多个线程可以同时占用。

线程池

常规考点:

1、7 大参数

2、3 种方法

3、4 种拒绝策略

预先创建好一定数量的线程对象,存入缓冲池中,需要用的时候直接从缓冲池中取出,用完之后不要销毁,还回到缓冲池中。

1、提高线程的利用率

2、提高响应速度

3、便于统一管理线程对象

4、可控制最大并发数

package com.janeroad.demo3;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class Test {
    public static void main(String[] args) {
        //单例线程池,缓冲池中只有一个线程读写
//        ExecutorService executorService = Executors.newSingleThreadExecutor();
        //指定线程个数的线程池
//        ExecutorService executorService = Executors.newFixedThreadPool(5);
        //缓冲线程池,线程个数由电脑硬件配置决定
        ExecutorService executorService = Executors.newCachedThreadPool();
        for (int i = 0; i < 10; i++) {
            final int temp = i;
            executorService.execute(()->{
                System.out.println(Thread.currentThread().getName()+":"+temp);
            });
        }
        executorService.shutdown();
    }
}
public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) {
    if (corePoolSize < 0 ||
        maximumPoolSize <= 0 ||
        maximumPoolSize < corePoolSize ||
        keepAliveTime < 0)
        throw new IllegalArgumentException();
    if (workQueue == null || threadFactory == null || handler == null)
        throw new NullPointerException();
    this.acc = (System.getSecurityManager() == null)
        ? null
        : AccessController.getContext();
    this.corePoolSize = corePoolSize;
    this.maximumPoolSize = maximumPoolSize;
    this.workQueue = workQueue;
    this.keepAliveTime = unit.toNanos(keepAliveTime);
    this.threadFactory = threadFactory;
    this.handler = handler;
}

corePoolSize:核心池大小

maximumPoolSize:线程池最大线程数

核心池大小就是线程池初始化的大小,最大线程数是一种应急方案,任务量突增的时候,额外去创建一定数量的线程对象,但是线程池中总的线程个数不能超过最大值。

keepAliveTime:线程对象在没有任务的情况,最多保存多长时间,时间数值。

unit:时间单位

TimeUnit.DAYS;		天
TimeUnit.HOURS;		小时
TimeUnit.MINUTES;	分钟
TimeUnit.SECONDS;	秒
TimeUnit.MILLISECONDS; 毫秒
TimeUnit.MICROSECONDS; 微秒
TimeUnit.NANOSECONDS;	 纳秒

workQueue:工作队列,阻塞队列

ArrayBlockingQueue:基于数组的先进先出。
LinkedBlockingQueue:基于链表的先进先出。
SynchronousQueue:不会保存任务,而是直接新建一个线程来执行任务。
PriorityBlockQueue:具有优先级的队列。

threadFactory:线程工厂,用来创建线程对象

RejectedExecutionHandler:拒绝执行处理器

AbortPolicy:抛出异常
DiscardPolicy:放弃任务,不抛出异常
DisccardOldestPolicy:尝试与队列最前面的任务去竞争,不抛出异常
CallerRunsPlicy:谁调用谁处理7 大参数

image.png

package com.janeroad.demo4;

import java.util.concurrent.*;

public class Test {
    public static void main(String[] args) {
        //创建线程池
        ExecutorService executorService = null;
        executorService = new ThreadPoolExecutor(
                2,
                3,
                1L,
                TimeUnit.SECONDS,
                new ArrayBlockingQueue<>(2),
                Executors.defaultThreadFactory(),
                new ThreadPoolExecutor.AbortPolicy()
                );
        //开始营业
      	//1-6
        for (int i = 0; i < 6; i++) {
            executorService.execute(()->{
                try {
                    TimeUnit.MILLISECONDS.sleep(100);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println(Thread.currentThread().getName()+"==>办理业务");
            });
        }
        executorService.shutdown();
    }
}

AbortPolicy()
image.png

DiscardPolicy()

image.png

DiscardOldestPolicy()

image.png

CallerRunsPolicy()
image.png

ForkJoin 框架

ForkJoin 是 JDK 1.7 提供多线程并发处理框架,本质上是对线程池的一种补充,它的核心思想就将一个大型任务拆分成很多个小任务,分别执行,最终再将小任务的结果进行汇总。
image.png

本质就是把一个线程任务拆分成多个线程并发执行

工作窃取

A、B 两个线程同时执行,A 的任务比较多,B 先执行完了,此时 B 将 A 的一部分任务拿过来替 A 执行,从而提升效率。
image.png

ForkJoin 框架的使用需要用到两个类

  • ForkJoinTask:任务
  • ForkJoinPool:线程池

ForkJoinTask 拆分任务,ForkJoinPool 提供线程对象来执行任务,之后合并。

重点是搞定 ForkJoinTask 如何拆分任务?

ForkJoinTask 使用的是递归的思想。

递归

一个函数,直接或间接调用自身,就是递归。

image.png

1、找出递推公式

f(n) = f(n-1) + 1
f(1) = 1

2、由递推公式得出函数

public int f(int n){
	if(n == 1) return 1;
	return f(n-1) + 1;
}

递归需要满足 3 要素:

1、一个父问题可以拆分成若干个子问题,并且若干个子问题的结果汇总起来就是父问题的答案。

2、父问题和子问题,解题思路完全一致,只是数据的规模不同。

3、存在终止条件。

假设有 n 个台阶,每次可以跨 1 个台阶或者 2 个台阶,请问走完这 n 个台阶,一共有多少种走法?

可以根据第一步的走法分成两类

1、第一步走了 1 步,n-1 个台阶的走法

2、第一步走了 2 步,n-2 个台阶的走法

两类的总是相加就是最终的结果

1、找出递推公式

f(n) = f(n-1)+f(n-2)
f(1) = 1
f(2) = 2

2、由递推公式得出函数

public int f(int n){
	if(n == 1) return 1;
	if(n == 2) return 2;
	return f(n-1)+f(n-2);
}
package com.janeroad.demo5;

public class Test {
    public static void main(String[] args) {
        for (int i = 1; i <= 30; i++) {
            System.out.println(i+"个台阶共有走法:"+f(i));
        }
    }

    public static int f(int n){
        if(n == 1) return 1;
        if(n == 2) return 2;
        return f(n-1)+f(n-2);
    }
}

计算 0-20 亿数字之和。
image.png

package com.janeroad.demo6;

import java.util.concurrent.RecursiveTask;

public class ForkJoinDemo extends RecursiveTask<Long> {

    private Long start;
    private Long end;
    private Long temp = 100_0000L;

    public ForkJoinDemo(Long start, Long end) {
        this.start = start;
        this.end = end;
    }

    @Override
    protected Long compute() {
        //到达临界值,不再拆分
        if((end-start)<temp){
            Long sum = 0L;
            for (Long i = start; i <= end; i++) {
                sum += i;
            }
            return sum;
        }else{
            Long avg = (start+end)/2;
            ForkJoinDemo task1 = new ForkJoinDemo(start,avg);
            task1.fork();
            ForkJoinDemo task2 = new ForkJoinDemo(avg,end);
            task2.fork();
            return task1.join()+task2.join();
        }
    }

}
package com.janeroad.demo6;

import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;

public class Test {
    public static void main(String[] args) {
//        Long sum = 0L;
//        Long startTime = System.currentTimeMillis();
//        for (Long i = 0L; i <= 20_0000_0000L ; i++) {
//            sum += i;
//        }
//        Long endTime = System.currentTimeMillis();
//        System.out.println(sum+",耗时"+(endTime-startTime));
        Long startTime = System.currentTimeMillis();
        ForkJoinPool forkJoinPool = new ForkJoinPool();
        ForkJoinTask<Long> task = new ForkJoinDemo(0L,20_0000_0000L);
        forkJoinPool.execute(task);
        Long sum = 0L;
        try {
            sum = task.get();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
        Long endTime = System.currentTimeMillis();
        System.out.println(sum+",耗时"+(endTime-startTime));
    }
}
# Java  多线程 

评论

Your browser is out-of-date!

Update your browser to view this website correctly. Update my browser now

×