侧边栏壁纸
博主头像
快乐江湖的博客博主等级

更多内容请点击CSDN关注“快乐江湖”

  • 累计撰写 127 篇文章
  • 累计创建 33 个标签
  • 累计收到 2 条评论

目 录CONTENT

文章目录

第一章Java多线程基础-第六节:多线程案例

快乐江湖
2023-06-30 / 0 评论 / 0 点赞 / 6 阅读 / 57479 字

一:单例模式

(1)设计模式概述

设计模式(Design pattern):设计模式代表了最佳的实践,通常被有经验的面向对象的软件开发人员所采用。设计模式是软件开发人员在软件开发过程中面临的一般问题的解决方案。这些解决方案是众多软件开发人员经过相当长的一段时间的试验和错误总结出来的,可以把设计模式比作软件开发中的“棋谱

有关设计模式详见:设计模式|菜鸟教程

(2)单例模式概述

单例模式(Singleton Pattern): 是Java 中最简单的设计模式之一。这种类型的设计模式属于创建型模式,它提供了一种创建对象的最佳方式。这种模式涉及到一个单一的类,该类负责创建自己的对象,同时确保只有单个对象被创建。这个类提供了一种访问其唯一的对象的方式,可以直接访问,不需要实例化该类的对象

  • 单例类只能有一个实例
  • 单例类必须给自己创建自己的唯一实例
  • 单例类必须给所有其他对象提供这一实例

优缺点

  • 优点:节省内存开销,避免对资源的多重占用
  • 缺点:没有接口,不能继承,与单一职责原则冲突,一个类应该只关心内部逻辑,而不关心外面如何实例化

引用实例

  • 一个班只能有一个班主任
  • Windows 是多进程多线程的,在操作一个文件的时候,就不可避免地出现多个进程或线程同时操作一个文件的现象,所以所有文件的处理必须通过唯一的实例来进行
  • 一些设备管理器常常设计为单例模式,比如一个电脑有两台打印机,在输出的时候就要处理不能两台打印机打印同一个文件

(3)单例模式实现

单例模式实现:实现单例模式时注意以下几点

  • 需要将构造方法私有化
  • 需要使用一个方法返回实例,该方法是获取该类的唯一实例的唯一入口

A:饿汉模式

饿汉模式:类加载的同时创建实例

class Singleton{
    private static Singleton instance = new Singleton();
    public static Singleton getInstance(){
        return instance;
    }
    //构造方法私有化
    private Singleton(){}
}

public class TestDemo {
    public static void main(String[] args) {
        //单例
        Singleton instance = Singleton.getInstance();
        //下面写法会被禁止
        //Singleton instance = new Singleton();
    }
}

B:懒汉模式

懒汉模式:类加载的同时不创建实例,第一次使用时才创建。懒汉模式,可以提高效率

  • 如果代码没有调用getInstance(),那么实例化过程就无需进行
  • 即使后续代码调用getInstance(),但由于调用调用时机可能较晚,所以创建实例的时机也就迟了,这样就把它和其他耗时操作分开

懒汉模式分为单线程多线程两个版本

①:单线程版

class SingletonLazy{
    private static SingletonLazy instance = null;

    public static SingletonLazy getInstance(){
        if(instance == null){
            instance = new SingletonLazy();
        }
        return instance;
    }
    //构造方法私有化
    private SingletonLazy(){}
}

②:多线程版

饿汉模式和懒汉模式的单线程版本实际上时线程不安全,这主要会发生在首次创建实例时,如果在多个线程中同时调用getInstance方法,就有可能创建出多个实例

例如

class SingletonLazy{
    private static SingletonLazy instance = null;

    public synchronized static SingletonLazy getInstance(){
        if(instance == null){
            instance = new SingletonLazy();
        }
        return instance;
    }
    //构造方法私有化
    private SingletonLazy(){}
}

public class TestDemo2 {
    public static void main(String[] args) {
        SingletonLazy instance = SingletonLazy.getInstance();
    }
}

或者

class SingletonLazy{
    private static SingletonLazy instance = null;

    public static SingletonLazy getInstance(){
        synchronized (SingletonLazy.class){
            if(instance == null){
                instance = new SingletonLazy();
            }
            return instance;
        }
    }
    //构造方法私有化
    private SingletonLazy(){}
}

public class TestDemo2 {
    public static void main(String[] args) {
        SingletonLazy instance = SingletonLazy.getInstance();
    }
}

③:多线程版(改进)

上述加锁的方式其实是一种“无脑式”的加法,因为线程并不是在任何时候都是不安全的。一旦实例创建完毕,后面即便有线程调用getInstance也不会涉及线程安全问题。所以对于加锁我们只在需要的时候加即可,做出如下改进

  • 使用双重if 进行判定,降低锁竞争频率
  • 使用volatile修饰instance
class SingletonLazy{
    private static volatile SingletonLazy instance = null;

    public static SingletonLazy getInstance(){
        if(instance == null) {
            synchronized (SingletonLazy.class) {
                if (instance == null) {
                    instance = new SingletonLazy();
                }
            }
     	return instance;
        }
    }
    //构造方法私有化
    private SingletonLazy(){}
}

public class TestDemo2 {
    public static void main(String[] args) {
        SingletonLazy instance = SingletonLazy.getInstance();
    }
}

关于双重if判定,需要说明

  • 外层if用于判定是否要加锁:一旦instance已经存在,通过该层判定就可得知,所以就不会再尝试获取锁了,降低了开销
  • 里层if用于判定是否需要创建实例

二:阻塞队列

阻塞队列:这是一种特殊的队列,也遵守先进先出的原则

  • 当队列满的时候,继续入队列就会阻塞,直到其他线程从队列中取走元素
  • 当队列空的时候,继续出队列就会阻塞,直到其他线程从队列中插入元素

阻塞队列的一个典型应用场景就是生产者与消费者模型

(1)生产者与消费者模型

在现实生活中,当我们缺少某些生活用品时,就回到超市去购买。当你到超市时,你的身份就是消费者,那么这些商品又是哪里来的呢,自然是供应商,那么它们就是生产者,而超市在生产者与消费者之间,就充当了一个交易场所。正是这样的方式才使得人类的交易变得高效,生产者只需要向超市供应商品,消费者只需要去超市购买商品

计算机是现实世界的抽象,因此像这种人类世界的模型,自然也被引入到了计算机当中。在实际软件开发中,进程或线程就是生产者和消费者,他们分别产生大量数据或消耗大量数据,但是他们之间一般不直接进行交流,而是生产者生产好数据之后把数据交到一个缓冲区中,消费者需要数据时直接从缓冲区中取就可以了

我们将其总结为321原则——3种关系,2个角色,1个场所

  • 3种关系:生产者与生产者之间是互斥关系,消费者与消费者之间是互斥关系,生产者与消费者之间是同步关系
  • 2个角色:生产者和消费者
  • 1个场所:它们之间进行数据交互是在一缓冲区当中,这个缓冲区可以有多种表现形式

(2)使用标准库中的阻塞队列完成

BlockingQueue:这一个接口,真正实现的类是LinkedBlockingQueue

  • put方法用于阻塞式的入队列
  • take用于阻塞式的出队列
import java.util.Random;
import java.util.concurrent.BlockingDeque;
import java.util.concurrent.LinkedBlockingDeque;

public class ProductorAndConsumer {
    public static void main(String[] args) throws InterruptedException {
        BlockingDeque<Integer> blockingDeque = new LinkedBlockingDeque<Integer>();
        Thread productor = new Thread(){
            Random random = new Random();
            @Override
            public void run(){
                while(true) {
                    try {
                        if(blockingDeque.size() <= 5){
                            int product = random.nextInt(1000);
                            System.out.println("生产者生产产品:" + product);
                            //入队列,每1s产生一个产品
                            blockingDeque.put(product);
                        }else{
                            System.out.println("队列已满请消耗完再放入");
                        }

                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        throw new RuntimeException(e);
                    }
                }
            }
        };

        Thread consumer = new Thread(){
            @Override
            public void run(){
                while(true){
                    try {
                        //出队列,每2s消耗一个产品
                        int value = blockingDeque.take();
                        System.out.println("消费者消费产品:" + value);
                        Thread.sleep(3000);
                    } catch (InterruptedException e) {
                        throw new RuntimeException(e);
                    }

                }
            }
        };

        productor.start();
        consumer.start();

        productor.join();
        consumer.join();
    }
}

(3)使用循环队列实现阻塞队列

A:关于循环队列

使用循环队列实现时,生产者将产品放入tail所指位置,消费者消费head所指位置处的产品

如下是循环队列的基本框架,但并未实现阻塞

class MyBlockingQueue{
    private int[] items = new int[1000];
    private int head = 0;
    private int tail = 0;
    private int size = 0;

    //入队列
    public void put(int elem){
        if(size >= items.length){
            return;
        }
        items[tail] = elem;
        tail++;
        if(tail >= items.length){
            tail = 0;
        }
        size++;
    }

    //出队列
    public Integer take(){
        if(size == 0){
            return null;//返回无效值
        }
        int res = items[head];
        head++;
        if(head >= items.length){
            head = 0;
        }
        size--;

        return res;
    }
}

B:实现

在上面代码的基础上,加入阻塞逻辑

  • 生产者在生产产品时,如果队列已满,需要调用wait()阻塞,直到队列不满;放入产品后需要调用notifyAll()通知消费者消费
  • 消费者在消费产品时,如果队列为空,需要调用wait()阻塞,直到队列不空;消费产品后需要调用notifyAll()通知生产者生产

注意

  • 使用volatile修饰headtailsize
  • 不要忘记synchronized
  • 判断队空或队满时,应该使用while,使其被唤醒时确确实实是对已经不空或者不满了,否则被唤醒时可能由于条件仍然不满足,而被迫还得继续等待

如下代码中,消费者每1s消费一个产品,生产者每0.5s生产一个产品

import java.util.Random;

class MyBlockingQueue{
    private int[] items = new int[1000];
    private volatile int head = 0;
    private volatile int tail = 0;
    private volatile int size = 0;

    //入队列
    public void put(int elem) throws InterruptedException {
        synchronized (this) {
            while (size == items.length) {
                // 队列满,阻塞等待
                this.wait();
            }
            items[tail] = elem;
            tail++;
            if (tail >= items.length) {
                tail = 0;
            }
            size++;
            // 通知消费者消费
            this.notifyAll();
        }
    }

    //出队列
    public Integer take() throws InterruptedException {
        synchronized (this) {
            while (size == 0) {
                // 队列空,阻塞等待
                this.wait();
            }
            int res = items[head];
            head++;
            if (head >= items.length) {
                head = 0;
            }
            size--;
            // 通知生产者生产
            this.notifyAll();
            return res;
        }
    }
}

public class TestDemo3 {
    public static void main(String[] args) throws InterruptedException {
        MyBlockingQueue myBlockingQueue = new MyBlockingQueue();
        Thread consumer = new Thread(){
            @Override
            public void run(){
                while(true) {
                    try {
                        int value = myBlockingQueue.take();
                        System.out.println("消费者获取产品:" + value);
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        throw new RuntimeException(e);
                    }
                }
            }
        };
        consumer.start();

        Thread producotr = new Thread(){
            Random random = new Random();
            @Override
            public void run(){
                while(true) {
                    try {
                        int value = random.nextInt(1000);
                        myBlockingQueue.put(value);
                        System.out.println("生产者生产产品:" + value);
                        Thread.sleep(500);

                    } catch (InterruptedException e) {
                        throw new RuntimeException(e);
                    }
                }
            }
        };
        producotr.start();

        consumer.join();
        producotr.join();
        
    }
}

三:定时器

(1)定时器概述

定时器:定时器是软件开发中的一个重要组件,类似于“闹钟”,达到一个设定的时间后便会执行指定代码

  • 例如服务器开发中,客户端发送请求后就需要等待服务器响应,如果客户端在规定的时间内没有收到响应,便会触发相应操作

(2)使用标准库中的定时器完成

Timer:标准库中提供了Timer类来完成定时器功能,其核心方法为schedule,它有两个参数

  • 参数1: 即将要执行的任务代码(使用TimerTask创建)
  • 参数2:指定多长时间后执行(单位为毫秒)

如下

import java.util.Timer;
import java.util.TimerTask;

public class TestDemo4 {
    public static void main(String[] args) {
        Timer timer = new Timer();
        timer.schedule(new TimerTask(){
            @Override
            public void run(){
                System.out.println("3s时间到");
            }
        }, 3000);

        System.out.println("开始计时");
    }
}

(3)定时器实现

①:主体框架

主体框架

  • 创建MyTask类,用于描述某个任务,内有两个成员分别为Runnabletime,表示待执行的任务细节该任务具体的执行时间(执行时间=当前时间戳+传入参数delay
  • 创建MyTimer类,其内部有很多任务在排队,定时器会在条件满足时选出一个时间最小的任务执行
  • 主函数内采用类似于标准库中的写法来安排任务

如下

// 描述待执行任务
class MyTask{
    // 要执行的任务
    private Runnable runnable;

    // 什么时间来执行任务(时间戳)
    private long time;

    public MyTask(Runnable runnable, long delay){
        this.runnable = runnable;
        this.time = System.currentTimeMillis() + delay;
    }

    public Runnable getRunnable() {
        return runnable;
    }

    public long getTime() {
        return time;
    }
}

// 描述定时器
class MyTimer{
    public void schedule(Runnable runnable, long after){
        MyTask myTask = new MyTask(runnable, after);
    }
}

public class TestDemo5 {
    public static void main(String[] args) {
        MyTimer myTimer = new MyTimer();
        myTimer.schedule(new Runnable() {
            @Override
            public void run() {
                System.out.println("时间到");
            }
        }, 3000);
    }
}

②:细节完善

MyTimer如何管理任务:定时器内会有很多任务,所以如何管理是一个问题。但不管用哪种数据结构,这种数据结构必须要带有优先级性质,因为我们每次需要把时间最小的那个任务给拿出来。首先会想到优先级队列(堆),但优先级队列是线程不安全的,所以不可取

private PriorityQueue<MyTask> tasks_queue = new PriorityQueue<>();

幸好标准库中的阻塞队列也有一个线程安全版本可供我们使用,也即

class MyTimer{
    private BlockingQueue<MyTask> tasks_queue = new PriorityBlockingQueue<>();

    public void schedule(Runnable runnable, long after) throws InterruptedException {
        MyTask myTask = new MyTask(runnable, after);
        tasks_queue.put(myTask);
    }
}

②如何取出队列中的任务:我们可以安排一个扫描线程,让此线程不断地检查队首任务,如果该任务时间已到则执行

class MyTimer{
    private BlockingQueue<MyTask> tasks_queue = new PriorityBlockingQueue<>();

    public MyTimer(){
        Thread scan_thread = new Thread(){
            @Override
            public void run(){
                while(true){
                    try {
                        MyTask task = tasks_queue.take();
                        if(task.getTime() <= System.currentTimeMillis()){
                            task.getRunnable().run();
                        }else{
                            tasks_queue.put(task);
                        }
                    } catch (InterruptedException e) {
                        throw new RuntimeException(e);
                    }
                }
            }
        };
    }

    public void schedule(Runnable runnable, long after) throws InterruptedException {
        MyTask myTask = new MyTask(runnable, after);
        tasks_queue.put(myTask);
    }
}

③关于对象比较:这是一个很容易忽略的问题,上述代码如果直接运行,会抛出如下异常,其含义为未实现比较

这是因为MyTaskPriorityBlockingQueue中必须实现比较功能,所以这里可以让MyTask实现Comparable接口

class MyTask implements Comparable<MyTask>{
    // 要执行的任务
    private Runnable runnable;

    // 什么时间来执行任务(时间戳)
    private long time;

    public MyTask(Runnable runnable, long delay){
        this.runnable = runnable;
        this.time = System.currentTimeMillis() + delay;
    }

    public Runnable getRunnable() {
        return runnable;
    }

    public long getTime() {
        return time;
    }

    @Override
    public int compareTo(MyTask o) {
        return (int)(this.time - o.time);
    }
}

④一个严重问题:如下,在判断队首任务时我们使用了一个while(true)循环,但是这个循环转得实在是太快了。例如,队首任务还有1分钟到点,但仅仅在这1min钟内这个循环有可能会循环上百万次,这对CPU资源是一种巨大的浪费。属于忙等

while(true){
    try {
        MyTask task = tasks_queue.take();
        if(task.getTime() <= System.currentTimeMillis()){
            task.getRunnable().run();
        }else{
            tasks_queue.put(task);
        }
    } catch (InterruptedException e) {
        throw new RuntimeException(e);
    }
}

所以我们可以建立一个locker使线程等待固定时间

class MyTimer{
    //任务队列
    private BlockingQueue<MyTask> tasks_queue = new PriorityBlockingQueue<>();
    //使线程等待固定时间
    private Object locker = new Object();

    public MyTimer(){
        //扫描线程,扫描队首任务
        Thread scan_thread = new Thread(){
            @Override
            public void run() {
                while (true) {
                    synchronized (locker) {
                        try {
                            MyTask task = tasks_queue.take();
                            long currentIime = System.currentTimeMillis();
                            //如果当前队首任务已经到店,则执行
                            if (task.getTime() <= currentIime) {
                                task.getRunnable().run();
                            } else {
                                //否则放回原队列等待
                                tasks_queue.put(task);
                                locker.wait(task.getTime() - currentIime);
                            }
                        } catch (InterruptedException e) {
                            throw new RuntimeException(e);
                        }
                    }
                }
            }
        };

        scan_thread.start();
    }

    public void schedule(Runnable runnable, long after) throws InterruptedException {
        MyTask myTask = new MyTask(runnable, after);
        tasks_queue.put(myTask);
        synchronized (locker){
            locker.notify();
        }
    }
}

③:完整代码

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.PriorityBlockingQueue;

// 描述待执行任务
class MyTask implements Comparable<MyTask>{
    // 要执行的任务
    private Runnable runnable;
    // 什么时间来执行任务(时间戳)
    private long time;

    public MyTask(Runnable runnable, long delay){
        this.runnable = runnable;
        this.time = System.currentTimeMillis() + delay;
    }

    public Runnable getRunnable() {
        return runnable;
    }

    public long getTime() {
        return time;
    }

    //注意比较
    @Override
    public int compareTo(MyTask o) {
        return (int)(this.time - o.time);
    }
}

// 描述定时器
class MyTimer{
    //任务队列
    private BlockingQueue<MyTask> tasks_queue = new PriorityBlockingQueue<>();
    //使线程等待固定时间
    private Object locker = new Object();

    public MyTimer(){
        //扫描线程,扫描队首任务
        Thread scan_thread = new Thread(){
            @Override
            public void run() {
                while (true) {
                    synchronized (locker) {
                        try {
                            MyTask task = tasks_queue.take();
                            long currentIime = System.currentTimeMillis();
                            //如果当前队首任务已经到店,则执行
                            if (task.getTime() <= currentIime) {
                                task.getRunnable().run();
                            } else {
                                //否则放回原队列等待
                                tasks_queue.put(task);
                                locker.wait(task.getTime() - currentIime);
                            }
                        } catch (InterruptedException e) {
                            throw new RuntimeException(e);
                        }
                    }
                }
            }
        };

        scan_thread.start();
    }

    public void schedule(Runnable runnable, long after) throws InterruptedException {
        MyTask myTask = new MyTask(runnable, after);
        tasks_queue.put(myTask);
        synchronized (locker){
            locker.notify();
        }
    }
}

public class TestDemo5 {
    public static void main(String[] args) throws InterruptedException {
        MyTimer myTimer = new MyTimer();
        myTimer.schedule(new Runnable() {
            @Override
            public void run() {
                System.out.println("时间到3s");
            }
        }, 3000);

        myTimer.schedule(new Runnable() {
            @Override
            public void run() {
                System.out.println("时间到5s");
            }
        }, 5000);

        myTimer.schedule(new Runnable() {
            @Override
            public void run() {
                System.out.println("时间到7s");
            }
        }, 7000);
    }
}

四:线程池

(1)线程池概述

线程池:线程池是线程的一种使用模式。在前面的情况中,我们都是遇到任务然后创建线程进行再执行。但是线程的频繁创建就类似于内存的频繁申请,会给操作系统带来更大的压力,进而影响整体的性能。所以我们一次申请好一定数量而定线程,然后将线程的管理操作交给线程池,就避免了在短时间内不断创建与销毁线程的代价,线程池不但能够保证内核的充分利用,还能防止过分调度,并根据实际业务情况进行修改

(2)使用标准库中的线程池完成

Executors:使用标准库中的Executors可以创建线程池,Executors本质是ThreadPoolExecutor类的封装。Executors创建线程池有如下几种方式

  • newFixedThreadPool:创建固定线程数的线程池
  • newCachedThreadPool:创建线程数目动态增长的线程池
  • newSingleThreadExecutor:创建只包含单个线程的线程池
  • newScheduledThreadPool:设定一定延迟时间后执行命令(进阶版Timer

其返回值类型为ExecutorService,通过ExecutorService.submit可注册一个任务到线程池中,让线程执行

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class TestDemo6 {
    public static void main(String[] args) {
        //没有显式new,是通过Executors类的静态方法完成
        ExecutorService thread_pool = Executors.newFixedThreadPool(10);
        for(int i = 0; i < 1000; i++){
            int taskId = i;
            thread_pool.submit(new Runnable() {
                @Override
                public void run() {
                    System.out.println("当前线程为" + Thread.currentThread().getName() + "正在执行任务" + taskId);
                }
            });
        }
    }
}

(3)线程池实现

思路

  • 在线程池中使用mission_queue这个阻塞队列来组织待执行的任务
  • submit方法用于将任务添加到mission_queue
  • 创建线程池时就创建相应数量的线程,然后让线程从阻塞队列中获取任务并执行
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingDeque;

class MyThreadPool{
    //使用阻塞队列组织任务
    private BlockingQueue<Runnable> mission_queue = new LinkedBlockingDeque<>();

    //submit用于将任务提交至mission_queue
    public void submit(Runnable runnable) throws InterruptedException {
        mission_queue.put(runnable);
    }

    //线程池创造时,创造出n个线程
    public MyThreadPool(int n){
        for(int i = 0; i < n; i++){
            Thread thread = new Thread(){
                @Override
                public void run(){
                    while(true) {
                        try {
                            //线程创建出来就去任务队列获取任务
                            Runnable runnable = mission_queue.take();
                            runnable.run();
                        } catch (InterruptedException e) {
                            throw new RuntimeException(e);
                        }
                    }

                }
            };
            thread.start();
        }
    }
}

public class TestDemo7 {
    public static void main(String[] args) throws InterruptedException {
        MyThreadPool threadPool = new MyThreadPool(10);
        for(int i = 0; i < 1000; i++){
            int taskId = i;
            threadPool.submit(new Runnable() {
                @Override
                public void run() {
                    System.out.println("当前线程为" + Thread.currentThread().getName() + "正在执行任务" + taskId);
                }
            });
        }
    }
}
0

评论区