您的当前位置:首页正文

【Linux】线程池详解及其基本架构与单例模式实现

2024-10-28 来源:个人技术集锦

1.关于线程池的基本理论         

1.1.线程池是什么?

1.2.线程池的应用场景:

2.线程池的基本架构

2.1.线程容器

2.2.任务队列

2.3.线程函数(HandlerTask)

2.4.线程唤醒机制

3.添加单例模式

3.1.单例模式是什么?

3.2.饿汉实现方式和懒汉实现方式

饿汉式单例模式:

懒汉式单例模式:

3.3.改写懒汉式的单例模式

双判断的方式为什么能减少单例的加锁成本呢?

单判断为什么会出错?

单例模式的注意点:

4.代码和执行效果

1.关于线程池的基本理论         

1.1.线程池是什么?

一种线程使用模式。线程过多会带来调度开销,进而影响缓存局部性和整体性能。而线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代价。线程池不仅能够保证内核的充分利用,还能防止过分调度。可用线程数量应该取决于可用的并发处理器、处理器内核、内存、网络sockets等的数量。

1.2.线程池的应用场景:

2.线程池的基本架构

  1. 线程容器:用来管理创建的线程,方便统一初始化。
  2. 任务队列:用来储存任务消息,需要支持压入与取出的操作。
  3. 线程函数(HandlerTask):线程都需要执行这个函数模块,在这个函数模块中进行任务的等待和执行。
  4. 线程唤醒机制:需要一个线程唤醒机制,通过条件变量和互斥锁完成对线程的保护与唤醒。
  5. 单例模式:线程池不需要创建多个,一个程序只需要一个线程池,通过单例模式进行优化。

2.1.线程容器

我们使用vector容器来存储线程,并且使用自己封装的线程来实现线程使用的各个接口

2.2.任务队列

我们使用队列这个容器来存储任务,并且利用队列FIFO的特性进行存储任务和取出任务

 std::queue<T> _task_queue;

2.3.线程函数(HandlerTask)

我们首先要明确线程需要死循环去执行任务,所以需要while一直循环,直到线程池已经退出了&&任务队列是空的。执行任务的同时还需要保证线程的安全,所以需要加锁来保证。

    void HandlerTask(std::string name)
    {
        LOG(INFO, "%s is running...", name.c_str());
        //线程需要死循环去处理任务
        while(true)
        {
            //1、保证队列安全
            LockQueue();
            //2、队列中不一定有数据
            while(_task_queue.empty() && _isrunning)
            {
                _waitnum++;
                ThreadSleep();
                _waitnum--;
            }
            //2.1 如果线程池已经退出了&&任务队列是空的
            if(_task_queue.empty() && !_isrunning)
            {
                UnlockQueue();
                break;
            }
            // 2.2 如果线程池不退出 && 任务队列不是空的
            // 2.3 如果线程池已经退出 && 任务队列不是空的 --- 处理完所有的任务,然后在退出
            // 3. 一定有任务, 处理任务
            T t = _task_queue.front();
            _task_queue.pop();
            UnlockQueue();
            LOG(DEBUG, "%s get a task", name.c_str());
            //4.处理任务,这个任务属于线程独占的任务
            //t();
            LOG(DEBUG, "%s handler a task, result is: %s", name.c_str(), t.ResultToString().c_str());
        }
    }

2.4.线程唤醒机制

需要一个线程唤醒机制,通过条件变量加互斥锁完成对线程的保护与唤醒。

3.添加单例模式

3.1.单例模式是什么?

某些类, 只应该具有一个对象(实例), 就称之为单例。在很多服务器开发场景中, 经常需要让服务器加载很多的数据 (上百G) 到内存中. 此时往往要用一个单例的类来管理这些数据.

3.2.饿汉实现方式和懒汉实现方式

饿汉式单例模式:

饿汉式单例模式在类加载时就完成了实例的创建。这种方式的特点是线程安全,因为 JVM 在加载类时会对静态变量进行初始化,并且这个过程是线程互斥的。

template <typename T>
class Singleton {
static T data;
public:
static T* GetInstance() {
return &data;
}
};

缺点:程序启动的时候,可能会很慢!所以我们一般不用饿汉

懒汉式单例模式:

template <typename T>
class Singleton {
static T* inst;
public:
static T* GetInstance() {
if (inst == NULL) {
inst = new T();
}
return inst;
}
};

缺点:存在一个严重的问题, 线程不安全.
第一次调用 GetInstance 的时候, 如果两个线程同时调用, 可能会创建出两份 T 对象的实例.
但是后续再次调用, 就没有问题了.

其实在日常使用中,我们一般不会使用饿汉式单例模式,因为它启动的时候过慢,所以我们来改写基于懒汉式的单例模式,主要解决线程安全的问题!

3.3.改写懒汉式的单例模式

添加双判断来解决线程安全问题。

    static ThreadPool<T> *GetInstance()
    {
        // 如果是多线程获取线程池对象下面的代码就有问题了!!
        // 只有第一次会创建对象,后续都是获取
        // 双判断的方式,可以有效减少获取单例的加锁成本,而且保证线程安全
        if (nullptr == _instance) // 保证第二次之后,所有线程,不用在加锁,直接返回_instance单例对象
        {
            LockGuard lockguard(&_lock);
            if (nullptr == _instance)
            {
                _instance = new ThreadPool<T>();
                _instance->InitThreadPool();
                _instance->Start();
                LOG(DEBUG, "创建线程池单例");
                return _instance;
            }
        }
        LOG(DEBUG, "获取线程池单例");
        return _instance;
    }

双判断的方式为什么能减少单例的加锁成本呢?

我们主要解决的是害怕多线程创建不止一个单例,我们的目的是让该单例模式只生产一个单例!围绕这一个核心去解决问题!

同时有很多进程过来的时候,都会去尝试加锁,但是只有一个线程可以加锁成功,然后会执行new操作,这时候_instance == nullptr就不成立了,再后来的线程不会等待在锁上了,直接判断外层的if就会退出了,不然所有的线程都要等待锁了。

单判断为什么会出错?

同时可能多个线程通过if判断,等待锁,第一个线程加锁完成之后,执行创建,退出之后其他线程可以继续抢锁,抢到以后继续创建,就保证不了线程安全!

单例模式的注意点:

  • 单例模式下的构造函数必须要有,但必须是私有的。
  • 赋值和拷贝函数禁用,因为只创建1个单例
  • 在类里面创建的静态变量在类内定义,需要在类外初始化

4.代码和执行效果

代码:

#pragma once

#include <iostream>
#include <vector>
#include <queue>
#include <pthread.h>
#include "Log.hpp"
#include "Thread.hpp"
#include "LockGuard.hpp"

using namespace ThreadModule;

const static int gdefaultthreadnum = 5;

template <typename T>
class ThreadPool
{
public:
    static ThreadPool<T> *GetInstance()
    {
        // 如果是多线程获取线程池对象下面的代码就有问题了!!
        // 只有第一次会创建对象,后续都是获取
        // 双判断的方式,可以有效减少获取单例的加锁成本,而且保证线程安全
        if (nullptr == _instance) // 保证第二次之后,所有线程,不用在加锁,直接返回_instance单例对象
        {
            LockGuard lockguard(&_lock);
            if (nullptr == _instance)
            {
                _instance = new ThreadPool<T>();
                _instance->InitThreadPool();
                _instance->Start();
                LOG(DEBUG, "创建线程池单例");
                return _instance;
            }
        }
        LOG(DEBUG, "获取线程池单例");
        return _instance;
    }

    void Stop()
    {
        LockQueue();
        _isrunning = false;
        ThreadWakeup();
        UnlockQueue();
    }

    void Wait()
    {
        for(auto &thread : _threads)
        {
            thread.Join();
            LOG(INFO, "%s is quit...", thread.name().c_str());
        }
    }

    bool Enqueue(const T &t)
    {
        bool ret = false;
        LockQueue();
        if(_isrunning)
        {
            _task_queue.push(t);
            if(_waitnum > 0)
            {
                ThreadWakeup();
            }
            LOG(DEBUG, "enqueue task success");
            ret = true;
        }
        UnlockQueue();
        return ret;
    }

    ~ThreadPool()
    {
        pthread_mutex_destroy(&_mutex);
        pthread_cond_destroy(&_cond);
    }

private:
    void LockQueue()
    {
        pthread_mutex_lock(&_mutex);
    }
    void UnlockQueue()
    {
        pthread_mutex_unlock(&_mutex);
    }
    void ThreadSleep()
    {
        pthread_mutex_unlock(&_mutex);
    }
    void ThreadWakeup()
    {
        // 唤醒一个等待特定条件变量的线程
        pthread_cond_signal(&_cond);
    }
    void ThreadWakeupAll()
    {
        // 唤醒所有等待特定条件变量的线程
        pthread_cond_broadcast(&_cond);
    }

    // 单例模式下的构造函数必须要有,但必须是私有的
    ThreadPool(int threadnum = gdefaultthreadnum) : _threadnum(threadnum), _waitnum(0), _isrunning(false)
    {
        pthread_mutex_init(&_mutex, nullptr);
        pthread_cond_init(&_cond, nullptr);
        LOG(INFO, "ThreadPool Construct()");
    }

    // 赋值和拷贝函数禁用,因为只创建1个单例
    ThreadPool<T> &operator=(const ThreadPool<T> &) = delete;
    ThreadPool(const ThreadPool<T> &) = delete;

    void Start()
    {
        for (auto &thread : _threads)
        {
            thread.Start();
        }
    }

    void InitThreadPool()
    {
        //构建出所有的线程,并不启动
        for(int num = 0; num < _threadnum; num++)
        {
            std::string name = "thread" + std::to_string(num+1);
            //bind函数到底有什么作用???
            _threads.emplace_back(std::bind(&ThreadPool::HandlerTask, this, std::placeholders::_1), name);
            LOG(INFO, "init thread %s done", name.c_str());
        }
    }

    void HandlerTask(std::string name)
    {
        LOG(INFO, "%s is running...", name.c_str());
        //线程需要死循环去处理任务
        while(true)
        {
            //1、保证队列安全
            LockQueue();
            //2、队列中不一定有数据
            while(_task_queue.empty() && _isrunning)
            {
                _waitnum++;
                ThreadSleep();
                _waitnum--;
            }
            //2.1 如果线程池已经退出了&&任务队列是空的
            if(_task_queue.empty() && !_isrunning)
            {
                UnlockQueue();
                break;
            }
            // 2.2 如果线程池不退出 && 任务队列不是空的
            // 2.3 如果线程池已经退出 && 任务队列不是空的 --- 处理完所有的任务,然后在退出
            // 3. 一定有任务, 处理任务
            T t = _task_queue.front();
            _task_queue.pop();
            UnlockQueue();
            LOG(DEBUG, "%s get a task", name.c_str());
            //4.处理任务,这个任务属于线程独占的任务
            //t();
            LOG(DEBUG, "%s handler a task, result is: %s", name.c_str(), t.ResultToString().c_str());
        }
    }

    int _threadnum;
    std::vector<Thread> _threads;
    std::queue<T> _task_queue;
    pthread_mutex_t _mutex;
    pthread_cond_t _cond;

    int _waitnum;
    bool _isrunning;

    // 添加单例模式
    static ThreadPool<T> *_instance;
    static pthread_mutex_t _lock;
};

// 在类里面创建的静态变量在类内定义,需要在类外初始化
template <typename T>
ThreadPool<T> *ThreadPool<T>::_instance = nullptr;

template <typename T>
pthread_mutex_t ThreadPool<T>::_lock = PTHREAD_MUTEX_INITIALIZER;

// 为什么双重判断就可以解决线程安全的问题?
// 为什么static就可以不用创建对象直接调用函数呢?

执行结果:

Top