您的当前位置:首页正文

【Linux】进程池实现指南:掌控并发编程的核心

2024-11-13 来源:个人技术集锦

往期Linux文章:Linux专栏

1.为什么要有进程池

如果你了解过STL的底层设计,你会发现在其中会有一个叫做内存池的设计。其作用就是先申请出一片空间,如果后续你需要对你的容器进行扩容,所扩展的空间就从内存池里取的。这样可以提高扩容的效率。
比如你要扩容100次,如果每次扩容都向系统申请空间的话,效率就很低,因为向系统申请空间也是需要时间的,所以内存池的作用就是一次性申请一篇空间,当需要扩容时就向内存池要,可以提高扩容的效率。
既然这样,我们也可以如此理解进程池,一次性创建一批进程,如果有任务要执行就交给进程池中空闲的进程来做,而不是一有任务就创建一个新的进程,进程池的目的也是为了提供效率,节省创建进程的时间消耗。
通过预先创建和复用进程,进程池能够提高任务执行效率,避免频繁创建和销毁进程带来的系统开销。

2.进程池的工作原理

进程池的核心思想是创建固定数量的进程,然后将需要执行的任务分配给这些进程来处理。当某个任务完成后,该进程可以继续处理下一个任务,而不是销毁。这样可以减少频繁创建和销毁进程带来的资源浪费。

2.1 进程池的工作流程

3. 进程池的实现(重点)

本文将着重讲解进程池的实现步骤。

初始化
通过运行可执行文件时传入的参数来创建一定数量的子进程。
如:创建5个子进程

./a.out 5

代码实现:

enum
{
    ArgcError = 1,
    ArgvError,
    PipeError
};

void Usage(const char* tip)
{
    cout<<"Usage:"<<tip<<" sub_process_num"<<endl;
}

int main(int argc,char* argv[])
{
    if(argc != 2)
    {//格式不对
        Usage(argv[0]);
        return ArgcError;
    }
    int sub_process_number = stoi(argv[1]);
    if(sub_process_number<=0)
    {//子进程数量不能小于1
        return ArgvError;
    }
    //创建子进程...
    return 0;
}

现在我们来分析下,我们要实现进程池的功能:创建子进程,发送任务给子进程执行,子进程的轮询,杀死进程,等待进程,一些Debug功能。这样的话我们完全可以创建一个类来封装这些功能。除此之外,我们还需要描述一下子进程,为此也需要创建一个描述管道的类。

3.1 Channel类

Channel类的功能主要是来描述管道的,具有的属性有该管道对应的子进程的id,名字,写端描述符。
Channel类的实现比较简单,直接看代码吧:

class Channel
{
public:
    Channel(int wfd,pid_t sub_process_id,string name)
        : wfd_(wfd),sub_process_id_(sub_process_id),name_(name)
        {}
     void printDebug()
    {
        cout<<"name:"<<name_<<endl;
        cout<<"wfd:"<<wfd_<<endl;
        cout<<"pid:"<<sub_process_id_<<endl;
    }
    string getName()
    {
        return name_;
    }
    pid_t getPid()
    {
        return sub_process_id_;
    }
    int getWfd()
    {
        return wfd_;
    }
    void Close()
    {
        close(wfd_);
    }
private:
    int wfd_;
    pid_t sub_process_id_;
    string name_;
};

3.2 ProcessPool类

ProcessPool类的功能主要是来描述进程池的,具有的属性有该管道对应的子进程的数量,所有的管道。
类似于这样:

ProcessPool类的框架:

class ProcessPool
{
public:
	ProcessPool(int sub_process_num)
		: sub_process_num_(sub_process_num)
		{}
	//...
private:
	int sub_process_num_;
	vector<Channel> channels;
};

3.2.1 创建子进程

因为我们需要创建指定数目的进程,用一个循环来写就可以了。在循环中,父进程每次都会创建一个子进程出来,然后用管道于它们链接,注意因为是父进程给子进程分配任务,所以需要把父进程的读端关闭,子进程的写端关闭。
初版:

int CreateProcess()
{
	for(int i = 0;i<sub_process_num_;++i)
	{
		int pipefd[2];
		int n = pipe(pipefd);
		if(n == -1)
		{
			return PipeError;
		}
		pid_t id = fork();
		if(id == 0)
		{
			//子进程,关闭写端
			close(pipefd[1]);
			//work...

		}
		//父进程,关闭读端
		close(pipefd[0]);
		string cname = "channel-"+to_string(i);
		channels.push_back(Channel(pipefd[1],id,cname));
	}
	return 0;
}

为了让子进程执行相应的任务,我们还可以添加一个回调函数workerworker函数主要作用是选择要执行的任务,具体的任务,我们还需要自己创建,为此还可以创建3个测试用的任务,用一个函数指针数组去保存这些函数。
代码如下:

typedef void(* work_t)(int);
typedef void(* task_t)(int,pid_t);

void PrintLog(int fd, pid_t pid)
{
    cout << "sub process: " << pid << ", fd: " << fd<< ", task is : printf log task\n" << endl;
}

void ReloadConf(int fd, pid_t pid)
{
    cout << "sub process: " << pid << ", fd: " << fd<< ", task is : reload conf task\n" << endl;
}

void ConnectMysql(int fd, pid_t pid)
{
    cout << "sub process: " << pid << ", fd: " << fd<< ", task is : connect mysql task\n" << endl;
}

task_t tasks[3] = {PrintLog, ReloadConf, ConnectMysql};

void worker(int fd)
{
    while(true)
    {
        uint32_t command_code = 0;
        ssize_t n = read(0,&command_code,sizeof(command_code));
        if(n == sizeof(command_code))
        {
            if(command_code>=3)
            {
                continue;
            }
            tasks[command_code](fd,getpid());
        }
        else
        {
            cout<<"sub process:"<<getpid()<<"quit now..."<<endl;
            break;
        }
    }
}

第二版:
第二版相对第一版,多了个回调函数,这个回调函数可以让我实现相对应的工作。同时也多个重定向功能,把原本标准输入的功能给到了pipefd[0],也就是说当子进程去读标准输入内的数据时,会去读管道中的数据。
这是一个典型的标准输入重定向操作,将管道的读端作为当前进程的输入来源

int CreateProcess(work_t work)
{
	//vector<int> fds;
	for(int i = 0;i<sub_process_num_;++i)
	{
		int pipefd[2];
		int n = pipe(pipefd);
		if(n == -1)
		{
			return PipeError;
		}
		pid_t id = fork();
		if(id == 0)
		{
			//子进程,关闭写端
			close(pipefd[1]);
			dup2(pipefd[0],0);
			work(pipefd[0]);
			exit(0);

		}
		//父进程,关闭读端
		close(pipefd[0]);
		string cname = "channel-"+to_string(i);
		channels.push_back(Channel(pipefd[1],id,cname));
	}
	return 0;
}

其实该代码中还存在bug,有个魔鬼细节存在!!!
第三版:
其实对于子进程来说,它的写端并没有全部关闭。下面我们来画图:
创建第一个管道,这个图如果看过我讲匿名管道的那篇的话,还是比较熟悉的。

现在我们来创建第二个管道,我们知道文件描述符的创建是遵循当前没有直接使用的最小的一个下标,作为新的文件描述符。所以呢,新创建的管道的pipefd[0]依旧是在先前的位置,可是写端就不是了,原先的写端并没有被关闭,我们新管道创建的pipefd[1]会在其下方被创建。
然后要知道的是,子进程是由父进程创建的,它的各项数据是由父进程复制而来,也就会把上一个管道的写端给复制过了,但是子进程可是关闭不了它的,因为它只能拿到新创建管道的写端pipefd[1]的位置。具体情况如图:

所以为了关闭子进程的所有写端,我们需要用有个数组去保存父进程中的写端,然后再子进程中把它们一一关闭。
代码如下:

int CreateProcess(work_t work)
{
	vector<int> fds;
	for(int i = 0;i<sub_process_num_;++i)
	{
		int pipefd[2];
		int n = pipe(pipefd);
		if(n == -1)
		{
			return PipeError;
		}
		pid_t id = fork();
		if(id == 0)
		{
			//子进程,关闭写端
			if(!fds.empty())
			{
				cout<<"close w fd:";
				for(auto fd:fds)
				{
					close(fd);
					cout<<fd<<" ";
				}
				cout<<endl;
			}
			close(pipefd[1]);
			dup2(pipefd[0],0);
			work(pipefd[0]);
			exit(0);

		}
		//父进程,关闭读端
		close(pipefd[0]);
		string cname = "channel-"+to_string(i);
		channels.push_back(Channel(pipefd[1],id,cname));
		fds.push_back(pipefd[1]);
	}
	return 0;
}

3.2.2 杀死所有进程

进程池也有不需要的时候,当进程池不需要了,我们就要回收子进程了,怎么回收呢?当然是进程等待了
杀死子进程也就是等待子进程。
要注意的是别忘了关闭文件描述符
进程等待是必须的,不然的话子进程会变成僵尸进程的。

void KillAllProcess()
{
	//在回收子进程前,我们需要把pipefd[1]全部关闭
	for(auto&channel:channels)
	{
		channel.Close();
		//关闭完文件描述符后,开始等待。等待进程需要子进程的pid,恰巧我们的Channel中存有子进程的pid
		pid_t pid = channel.getPid();
		pid_t rid = waitpid(pid,nullptr,0);
		if(rid == pid)
		{
			//回收成功
			cout<<"wait sub process:"<<pid<<" success..."<<endl;
		}
		cout<<channel.getName()<<"close done"<<" sub process quit now:"<<channel.getPid()<<endl;
	}
}

3.2.3 其他功能

因为这些功能都比较简单就一块讲了吧。
子进程的轮询,我能总不能让一个子进程一直跑任务吧,为了合理利用子进程,我们可以设计也该轮询函数,让子进程的任务分配"雨露均沾"。

int NextChannel()
{
	static int next = 0;//static修饰的变量只会初始化一次。
	int c = next;
	next = (next+1)%sub_process_num_;
	return c;
}

发送任务代码:

void SendTaskCode(int index,uint32_t code)
{
	cout << "send code: " << code << " to " << channels[index].getName() << " sub prorcess id: " << channels[index].getPid() << endl;
	write(channels[index].getPid(), &code, sizeof(code));
}

debug:

void Debug()
{
	for(auto&channel:channels)
	{
		channel.printDebug();
		cout<<endl;
	}
}

3.3 控制进程池

完成上面的功能就需要我们去控制进程池的子进程了。
主要包括创建进程池,控制子进程,回收子进程。

void CtrlSubProcess(ProcessPool* processpool,int cnt)
{
    while(cnt--)
    {
        //选择一个进程和通道
        int channel = processpool->NextChannel();
        //选择一个任务
        uint32_t code = NextTask();
        processpool->SendTaskCode(channel,code);
        sleep(1);
    }
}

int main(int argc,char* argv[])
{
    if(argc!=2)
    {
        Usage(argv[0]);
        return ArgcError;
    }
    int sub_process_num = stoi(argv[1]);
    if(sub_process_num<1)
    {
        return ArgvError;
    }
    srand((unsigned int)time(nullptr));//生成随机种子
    //创建进程池
    ProcessPool* processpool = new ProcessPool(sub_process_num);
    processpool->CreateProcess(worker);
    processpool->Debug();
    //sleep(2);
    //控制子进程
    CtrlSubProcess(processpool,10);
    //sleep(2);
    //回收子进程
    processpool->KillAllProcess();
    delete processpool;

    return 0;
}

运行结果:

4. 完整代码

///processpool.cc//
#include <iostream>
#include <vector>
#include <unistd.h>
#include <cstdlib>
#include <ctime>
#include <sys/wait.h>
#include <sys/types.h>
#include "bolg.hpp"
using namespace std;
enum
{
    ArgcError = 1,
    ArgvError,
    PipeError
};

class Channel
{
public:
    Channel(int wfd,pid_t sub_process_id,string name)
        : wfd_(wfd),sub_process_id_(sub_process_id),name_(name)
        {}
    void printDebug()
    {
        cout<<"name:"<<name_<<endl;
        cout<<"wfd:"<<wfd_<<endl;
        cout<<"pid:"<<sub_process_id_<<endl;
    }
    string getName()
    {
        return name_;
    }
    pid_t getPid()
    {
        return sub_process_id_;
    }
    int getWfd()
    {
        return wfd_;
    }
    void Close()
    {
        close(wfd_);
    }
private:
    int wfd_;
    pid_t sub_process_id_;
    string name_;
};

class ProcessPool
{
public:
    ProcessPool(int sub_process_num)
        : sub_process_num_(sub_process_num)
        {}
    int CreateProcess(work_t work)
    {
        vector<int> fds;
        for(int i = 0;i<sub_process_num_;++i)
        {
            int pipefd[2];
            int n = pipe(pipefd);
            if(n == -1)
            {
                return PipeError;
            }
            pid_t id = fork();
            if(id == 0)
            {
                //子进程,关闭写端
                if(!fds.empty())
                {
                    cout<<"close w fd:";
                    for(auto fd:fds)
                    {
                        close(fd);
                        cout<<fd<<" ";
                    }
                    cout<<endl;
                }
                close(pipefd[1]);
                dup2(pipefd[0],0);
                work(pipefd[0]);
                exit(0);

            }
            //父进程,关闭读端
            close(pipefd[0]);
            string cname = "channel-"+to_string(i);
            channels.push_back(Channel(pipefd[1],id,cname));
            fds.push_back(pipefd[1]);
        }
        return 0;
    }
    void KillAllProcess()
    {
        //在回收子进程前,我们需要把pipefd[1]全部关闭
        for(auto&channel:channels)
        {
            channel.Close();
            //关闭完文件描述符后,开始等待。等待进程需要子进程的pid,恰巧我们的Channel中存有子进程的pid
            pid_t pid = channel.getPid();
            pid_t rid = waitpid(pid,nullptr,0);
            if(rid == pid)
            {
                //回收成功
                cout<<"wait sub process:"<<pid<<" success..."<<endl;
            }
            cout<<channel.getName()<<"close done"<<" sub process quit now:"<<channel.getPid()<<endl;
        }
    }
    int NextChannel()
    {
        static int next = 0;
        int c = next;
        next = (next+1)%sub_process_num_;
        return c;
    }
    void SendTaskCode(int index,uint32_t code)
    {
        cout << "send code: " << code << " to " << channels[index].getName() << " sub prorcess id: " << channels[index].getPid() << endl;
        write(channels[index].getPid(), &code, sizeof(code));
    }
    void Debug()
    {
        for(auto&channel:channels)
        {
            channel.printDebug();
            cout<<endl;
        }
    }
private:
   int sub_process_num_;
   vector<Channel> channels;
};


void Usage(const char* tip)
{
    cout<<"Usage:"<<tip<<" sub_process_num"<<endl;
}



void CtrlSubProcess(ProcessPool* processpool,int cnt)
{
    while(cnt--)
    {
        //选择一个进程和通道
        int channel = processpool->NextChannel();
        //选择一个任务
        uint32_t code = NextTask();
        processpool->SendTaskCode(channel,code);
        sleep(1);
    }
}

int main(int argc,char* argv[])
{
    if(argc!=2)
    {
        Usage(argv[0]);
        return ArgcError;
    }
    int sub_process_num = stoi(argv[1]);
    if(sub_process_num<1)
    {
        return ArgvError;
    }
    srand((unsigned int)time(nullptr));
    //创建进程池
    ProcessPool* processpool = new ProcessPool(sub_process_num);
    processpool->CreateProcess(worker);
    processpool->Debug();
    //sleep(2);
    //控制子进程
    CtrlSubProcess(processpool,10);
    //sleep(2);
    //回收子进程
    processpool->KillAllProcess();
    delete processpool;

    return 0;
}
///task.hpp
#include <iostream>
#include <vector>
#include <cstdlib>
#include <unistd.h>
using namespace std;
//创建函数指针
typedef void(* work_t)(int);
typedef void(* task_t)(int,pid_t);

void PrintLog(int fd, pid_t pid)
{
    cout << "sub process: " << pid << ", fd: " << fd<< ", task is : printf log task\n" << endl;
}

void ReloadConf(int fd, pid_t pid)
{
    cout << "sub process: " << pid << ", fd: " << fd<< ", task is : reload conf task\n" << endl;
}

void ConnectMysql(int fd, pid_t pid)
{
    cout << "sub process: " << pid << ", fd: " << fd<< ", task is : connect mysql task\n" << endl;
}

uint32_t NextTask()
{
    return rand()%3;
}

task_t tasks[3] = {PrintLog, ReloadConf, ConnectMysql};

void worker(int fd)
{
    while(true)
    {
        uint32_t command_code = 0;
        ssize_t n = read(0,&command_code,sizeof(command_code));
        if(n == sizeof(command_code))
        {
            if(command_code>=3)
            {
                continue;
            }
            tasks[command_code](fd,getpid());
        }
        else
        {
            cout<<"sub process:"<<getpid()<<"quit now..."<<endl;
            break;
        }
    }
}

5. 总结

进程池的核心思想是创建固定数量的进程,然后将需要执行的任务分配给这些进程来处理。当某个任务完成后,该进程可以继续处理下一个任务,而不是销毁。这样可以减少频繁创建和销毁进程带来的资源浪费。

Top