您的位置:

golang线程池,golang 线程池和协程池

本文目录一览:

golang的线程模型——GMP模型

内核线程(Kernel-Level Thread ,KLT)

轻量级进程(Light Weight Process,LWP):轻量级进程就是我们通常意义上所讲的线程,由于每个轻量级进程都由一个内核线程支持,因此只有先支持内核线程,才能有轻量级进程

用户线程与系统线程一一对应,用户线程执行如lo操作的系统调用时,来回切换操作开销相对比较大

多个用户线程对应一个内核线程,当内核线程对应的一个用户线程被阻塞挂起时候,其他用户线程也阻塞不能执行了。

多对多模型是可以充分利用多核CPU提升运行效能的

go线程模型包含三个概念:内核线程(M),goroutine(G),G的上下文环境(P);

GMP模型是goalng特有的。

P与M一般是一一对应的。P(上下文)管理着一组G(goroutine)挂载在M(内核线程)上运行,图中左边蓝色为正在执行状态的goroutine,右边为待执行状态的goroutiine队列。P的数量由环境变量GOMAXPROCS的值或程序运行runtime.GOMAXPROCS()进行设置。

当一个os线程在执行M1一个G1发生阻塞时,调度器让M1抛弃P,等待G1返回,然后另起一个M2接收P来执行剩下的goroutine队列(G2、G3...),这是golang调度器厉害的地方,可以保证有足够的线程来运行剩下所有的goroutine。

当G1结束后,M1会重新拿回P来完成,如果拿不到就丢到全局runqueue中,然后自己放到线程池或转入休眠状态。空闲的上下文P会周期性的检查全局runqueue上的goroutine,并且执行它。

另一种情况就是当有些P1太闲而其他P2很忙碌的时候,会从其他上下文P2拿一些G来执行。

详细可以翻看下方第一个参考链接,写得真好。

最后用大佬的总结来做最后的收尾————

Go语言运行时,通过核心元素G,M,P 和 自己的调度器,实现了自己的并发线程模型。调度器通过对G,M,P的调度实现了两级线程模型中操作系统内核之外的调度任务。整个调度过程中会在多种时机去触发最核心的步骤 “一整轮调度”,而一整轮调度中最关键的部分在“全力查找可运行G”,它保证了M的高效运行(换句话说就是充分使用了计算机的物理资源),一整轮调度中还会涉及到M的启用停止。最后别忘了,还有一个与Go程序生命周期相同的系统监测任务来进行一些辅助性的工作。

浅析Golang的线程模型与调度器

Golang CSP并发模型

Golang线程模型

【golang详解】go语言GMP(GPM)原理和调度

Goroutine调度是一个很复杂的机制,下面尝试用简单的语言描述一下Goroutine调度机制,想要对其有更深入的了解可以去研读一下源码。

首先介绍一下GMP什么意思:

G ----------- goroutine: 即Go协程,每个go关键字都会创建一个协程。

M ---------- thread内核级线程,所有的G都要放在M上才能运行。

P ----------- processor处理器,调度G到M上,其维护了一个队列,存储了所有需要它来调度的G。

Goroutine 调度器P和 OS 调度器是通过 M 结合起来的,每个 M 都代表了 1 个内核线程,OS 调度器负责把内核线程分配到 CPU 的核上执行

模型图:

避免频繁的创建、销毁线程,而是对线程的复用。

1)work stealing机制

  当本线程无可运行的G时,尝试从其他线程绑定的P偷取G,而不是销毁线程。

2)hand off机制

  当本线程M0因为G0进行系统调用阻塞时,线程释放绑定的P,把P转移给其他空闲的线程执行。进而某个空闲的M1获取P,继续执行P队列中剩下的G。而M0由于陷入系统调用而进被阻塞,M1接替M0的工作,只要P不空闲,就可以保证充分利用CPU。M1的来源有可能是M的缓存池,也可能是新建的。当G0系统调用结束后,根据M0是否能获取到P,将会将G0做不同的处理:

如果有空闲的P,则获取一个P,继续执行G0。

如果没有空闲的P,则将G0放入全局队列,等待被其他的P调度。然后M0将进入缓存池睡眠。

如下图

GOMAXPROCS设置P的数量,最多有GOMAXPROCS个线程分布在多个CPU上同时运行

在Go中一个goroutine最多占用CPU 10ms,防止其他goroutine被饿死。

具体可以去看另一篇文章

【Golang详解】go语言调度机制 抢占式调度

当创建一个新的G之后优先加入本地队列,如果本地队列满了,会将本地队列的G移动到全局队列里面,当M执行work stealing从其他P偷不到G时,它可以从全局G队列获取G。

协程经历过程

我们创建一个协程 go func()经历过程如下图:

说明:

这里有两个存储G的队列,一个是局部调度器P的本地队列、一个是全局G队列。新创建的G会先保存在P的本地队列中,如果P的本地队列已经满了就会保存在全局的队列中;处理器本地队列是一个使用数组构成的环形链表,它最多可以存储 256 个待执行任务。

G只能运行在M中,一个M必须持有一个P,M与P是1:1的关系。M会从P的本地队列弹出一个可执行状态的G来执行,如果P的本地队列为空,就会想其他的MP组合偷取一个可执行的G来执行;

一个M调度G执行的过程是一个循环机制;会一直从本地队列或全局队列中获取G

上面说到P的个数默认等于CPU核数,每个M必须持有一个P才可以执行G,一般情况下M的个数会略大于P的个数,这多出来的M将会在G产生系统调用时发挥作用。类似线程池,Go也提供一个M的池子,需要时从池子中获取,用完放回池子,不够用时就再创建一个。

work-stealing调度算法:当M执行完了当前P的本地队列队列里的所有G后,P也不会就这么在那躺尸啥都不干,它会先尝试从全局队列队列寻找G来执行,如果全局队列为空,它会随机挑选另外一个P,从它的队列里中拿走一半的G到自己的队列中执行。

如果一切正常,调度器会以上述的那种方式顺畅地运行,但这个世界没这么美好,总有意外发生,以下分析goroutine在两种例外情况下的行为。

Go runtime会在下面的goroutine被阻塞的情况下运行另外一个goroutine:

用户态阻塞/唤醒

当goroutine因为channel操作或者network I/O而阻塞时(实际上golang已经用netpoller实现了goroutine网络I/O阻塞不会导致M被阻塞,仅阻塞G,这里仅仅是举个栗子),对应的G会被放置到某个wait队列(如channel的waitq),该G的状态由_Gruning变为_Gwaitting,而M会跳过该G尝试获取并执行下一个G,如果此时没有可运行的G供M运行,那么M将解绑P,并进入sleep状态;当阻塞的G被另一端的G2唤醒时(比如channel的可读/写通知),G被标记为,尝试加入G2所在P的runnext(runnext是线程下一个需要执行的 Goroutine。), 然后再是P的本地队列和全局队列。

系统调用阻塞

当M执行某一个G时候如果发生了阻塞操作,M会阻塞,如果当前有一些G在执行,调度器会把这个线程M从P中摘除,然后再创建一个新的操作系统的线程(如果有空闲的线程可用就复用空闲线程)来服务于这个P。当M系统调用结束时候,这个G会尝试获取一个空闲的P执行,并放入到这个P的本地队列。如果获取不到P,那么这个线程M变成休眠状态, 加入到空闲线程中,然后这个G会被放入全局队列中。

队列轮转

可见每个P维护着一个包含G的队列,不考虑G进入系统调用或IO操作的情况下,P周期性的将G调度到M中执行,执行一小段时间,将上下文保存下来,然后将G放到队列尾部,然后从队列中重新取出一个G进行调度。

除了每个P维护的G队列以外,还有一个全局的队列,每个P会周期性地查看全局队列中是否有G待运行并将其调度到M中执行,全局队列中G的来源,主要有从系统调用中恢复的G。之所以P会周期性地查看全局队列,也是为了防止全局队列中的G被饿死。

除了每个P维护的G队列以外,还有一个全局的队列,每个P会周期性地查看全局队列中是否有G待运行并将其调度到M中执行,全局队列中G的来源,主要有从系统调用中恢复的G。之所以P会周期性地查看全局队列,也是为了防止全局队列中的G被饿死。

M0

M0是启动程序后的编号为0的主线程,这个M对应的实例会在全局变量rutime.m0中,不需要在heap上分配,M0负责执行初始化操作和启动第一个G,在之后M0就和其他的M一样了

G0

G0是每次启动一个M都会第一个创建的goroutine,G0仅用于负责调度G,G0不指向任何可执行的函数,每个M都会有一个自己的G0,在调度或系统调用时会使用G0的栈空间,全局变量的G0是M0的G0

一个G由于调度被中断,此后如何恢复?

中断的时候将寄存器里的栈信息,保存到自己的G对象里面。当再次轮到自己执行时,将自己保存的栈信息复制到寄存器里面,这样就接着上次之后运行了。

我这里只是根据自己的理解进行了简单的介绍,想要详细了解有关GMP的底层原理可以去看Go调度器 G-P-M 模型的设计者的文档或直接看源码

参考: ()

()

Linux下C/C++ 手写一个线程池-

在我们日常生活中会遇到许许多多的问题,如果一个服务端要接受很多客户端的数据,该怎么办?多线程并发内存不够怎么办?所以我们需要了解线程池的相关知识。

1.线程池的简介

线程池是一种多线程处理形式,处理过程中将任务添加到队列,然后在创建线程后自动启动这些任务。线程池线程都是后台线程。每个线程都使用默认的堆栈大小,以默认的优先级运行,并处于多线程单元中。如果某个线程在托管代码中空闲(如正在等待某个事件),则线程池将插入另一个辅助线程来使所有处理器保持繁忙。如果所有线程池线程都始终保持繁忙,但队列中包含挂起的工作,则线程池将在一段时间后创建另一个辅助线程但线程的数目永远不会超过最大值。超过最大值的线程可以排队,但他们要等到其他线程完成后才启动。

2.线程池的组成

1、线程池管理器(ThreadPoolManager):用于创建并管理线程池

2、工作线程(WorkThread): 线程池中线程

3、任务接口(Task):每个任务必须实现的接口,以供工作线程调度任务的执行。

4、任务队列:用于存放没有处理的任务。提供一种缓冲机制。

3.线程池的主要优点

1.避免线程太多,使得内存耗尽

2.避免创建与销毁线程的代价

3.任务与执行分离

1.线程池结构体定义

代码如下(示例):

相关视频推荐

150行代码,带你手写线程池,自行准备linux环境

C++后台开发该学哪些内容,标准技术路线及面经与算法该如何刷

学习地址:C/C++Linux服务器开发/后台架构师【零声教育】-学习视频教程-腾讯课堂

需要更多C/C++ Linux服务器架构师学习资料加qun 812855908 (资料包括C/C++,Linux,golang技术,内核,Nginx,ZeroMQ,MySQL,Redis,fastdfs,MongoDB,ZK,流媒体,CDN,P2P,K8S,Docker,TCP/IP,协程,DPDK,ffmpeg,大厂面试题 等)

2.接口定义

代码如下(示例):

3.回调函数

代码如下(示例):

4.全部代码(加注释)

代码如下(示例):

关于线程池是基本代码就在上面了,关于编程这一部分内容,我建议大家还是要自己去动手实现,如果只是单纯的看了一遍,知识这块可能会记住,但是操作起来可能就比较吃力,万事开头难,只要坚持下去,总会有结果的。

golang协程调度模式解密

golang学习笔记

频繁创建线程会造成不必要的开销,所以才有了线程池。在线程池中预先保存一定数量的线程,新任务发布到任务队列,线程池中的线程不断地从任务队列中取出任务并执行,可以有效的减少创建和销毁带来的开销。

过多的线程会导致争抢cpu资源,且上下文的切换的开销变大。而工作在用户态的协程能大大减少上下文切换的开销。协程调度器把可运行的协程逐个调度到线程中执行,同时即时把阻塞的协程调度出协程,从而有效地避免了线程的频繁切换,达到了少量线程实现高并发的效果。

多个协程分享操作系统分给线程的时间片,从而达到充分利用CPU的目的,协程调度器决定了则决定了协程运行的顺序。每个线程同一时刻只能运行一个协程。

go调度模型包含三个实体:

每个处理器维护者一个协程G的队列,处理器依次将协程G调度到M中执行。

每个P会周期性地查看全局队列中是否有G待运行并将其调度到M中执行,全局队列中的G主要来自系统调用中恢复的G.

如果协程发起系统调用,则整个工作线程M被阻塞,协程队列中的其他协程都会阻塞。

一般情况下M的个数会略大于P个数,多出来的M将会在G产生系统调用时发挥作用。与线程池类似,Go也提供M池子。当协程G1发起系统掉用时,M1会释放P,由 M1-P-G1 G2 ... 转变成 M1-G1 , M2会接管P的其他协程 M2-P-G2 G3 G4... 。

冗余的M可能来源于缓存池,也可能是新建的。

当G1结束系统调用后,根据M1是否获取到P,进行不用的处理。

多个处理P维护队列可能不均衡,导致部分处理器非常繁忙,而其余相对空闲。产生原因是有些协程自身不断地派生协程。

为此Go调度器提供了工作量窃取策略,当某个处理器P没有需要调度的协程时,将从其他处理中偷取协程,每次偷取一半。

抢占式调度,是指避免某个协程长时间执行,而阻碍其他协程被调度的机制。

调度器监控每个协程执行时间,一旦执行时间过长且有其他协程等待,会把协程暂停,转而调度等待的协程,以达到类似时间片轮转的效果。比如for循环会一直占用执行权。

在IO密集型应用,GOMAXPROCS大小设置大一些,获取性能会更好。

IO密集型会经常发生系统调用,会有一个新的M启用或创建,但由于Go调度器检测M到被阻塞有一定延迟。如果P数量多,则P管理协程队列会变小。

如何用go语言每分钟处理100万个请求

在Malwarebytes 我们经历了显著的增长,自从我一年前加入了硅谷的公司,一个主要的职责成了设计架构和开发一些系统来支持一个快速增长的信息安全公司和所有需要的设施来支持一个每天百万用户使用的产品。我在反病毒和反恶意软件行业的不同公司工作了12年,从而我知道由于我们每天处理大量的数据,这些系统是多么复杂。

有趣的是,在过去的大约9年间,我参与的所有的web后端的开发通常是通过Ruby on Rails技术实现的。不要错怪我。我喜欢Ruby on Rails,并且我相信它是个令人惊讶的环境。但是一段时间后,你会开始以ruby的方式开始思考和设计系统,你会忘记,如果你可以利用多线程、并行、快速执行和小内存开销,软件架构本来应该是多么高效和简单。很多年期间,我是一个c/c++、Delphi和c#开发者,我刚开始意识到使用正确的工具可以把复杂的事情变得简单些。

作为首席架构师,我不会很关心在互联网上的语言和框架战争。我相信效率、生产力。代码可维护性主要依赖于你如何把解决方案设计得很简单。

问题

当工作在我们的匿名遥测和分析系统中,我们的目标是可以处理来自于百万级别的终端的大量的POST请求。web处理服务可以接收包含了很多payload的集合的JSON数据,这些数据需要写入Amazon S3中。接下来,map-reduce系统可以操作这些数据。

按照习惯,我们会调研服务层级架构,涉及的软件如下:

Sidekiq

Resque

DelayedJob

Elasticbeanstalk Worker Tier

RabbitMQ

and so on…

搭建了2个不同的集群,一个提供web前端,另外一个提供后端处理,这样我们可以横向扩展后端服务的数量。

但是,从刚开始,在 讨论阶段我们的团队就知道我们应该使用Go,因为我们看到这会潜在性地成为一个非常庞大( large traffic)的系统。我已经使用了Go语言大约2年时间,我们开发了几个系统,但是很少会达到这样的负载(amount of load)。

我们开始创建一些结构,定义从POST调用得到的web请求负载,还有一个上传到S3 budket的函数。

type PayloadCollection struct {

WindowsVersion string `json:"version"`

Token string `json:"token"`

Payloads []Payload `json:"data"`

}

type Payload struct {

// [redacted]

}

func (p *Payload) UploadToS3() error {

// the storageFolder method ensures that there are no name collision in

// case we get same timestamp in the key name

storage_path := fmt.Sprintf("%v/%v", p.storageFolder, time.Now().UnixNano())

bucket := S3Bucket

b := new(bytes.Buffer)

encodeErr := json.NewEncoder(b).Encode(payload)

if encodeErr != nil {

return encodeErr

}

// Everything we post to the S3 bucket should be marked 'private'

var acl = s3.Private

var contentType = "application/octet-stream"

return bucket.PutReader(storage_path, b, int64(b.Len()), contentType, acl, s3.Options{})

}

本地Go routines方法

刚开始,我们采用了一个非常本地化的POST处理实现,仅仅尝试把发到简单go routine的job并行化:

func payloadHandler(w http.ResponseWriter, r *http.Request) {

if r.Method != "POST" {

w.WriteHeader(http.StatusMethodNotAllowed)

return

}

// Read the body into a string for json decoding

var content = PayloadCollection{}

err := json.NewDecoder(io.LimitReader(r.Body, MaxLength)).Decode(content)

if err != nil {

w.Header().Set("Content-Type", "application/json; charset=UTF-8")

w.WriteHeader(http.StatusBadRequest)

return

}

// Go through each payload and queue items individually to be posted to S3

for _, payload := range content.Payloads {

go payload.UploadToS3() // ----- DON'T DO THIS

}

w.WriteHeader(http.StatusOK)

}

对于中小负载,这会对大多数的人适用,但是大规模下,这个方案会很快被证明不是很好用。我们期望的请求数,不在我们刚开始计划的数量级,当我们把第一个版本部署到生产环境上。我们完全低估了流量。

上面的方案在很多地方很不好。没有办法控制我们产生的go routine的数量。由于我们收到了每分钟1百万的POST请求,这段代码很快就崩溃了。

再次尝试

我们需要找一个不同的方式。自开始我们就讨论过, 我们需要保持请求处理程序的生命周期很短,并且进程在后台产生。当然,这是你在Ruby on Rails的世界里必须要做的事情,否则你会阻塞在所有可用的工作 web处理器上,不管你是使用puma、unicore还是passenger(我们不要讨论JRuby这个话题)。然后我们需要利用常用的处理方案来做这些,比如Resque、 Sidekiq、 SQS等。这个列表会继续保留,因为有很多的方案可以实现这些。

所以,第二次迭代,我们创建了一个缓冲channel,我们可以把job排队,然后把它们上传到S3。因为我们可以控制我们队列中的item最大值,我们有大量的内存来排列job,我们认为只要把job在channel里面缓冲就可以了。

var Queue chan Payload

func init() {

Queue = make(chan Payload, MAX_QUEUE)

}

func payloadHandler(w http.ResponseWriter, r *http.Request) {

...

// Go through each payload and queue items individually to be posted to S3

for _, payload := range content.Payloads {

Queue - payload

}

...

}

接下来,我们再从队列中取job,然后处理它们。我们使用类似于下面的代码:

func StartProcessor() {

for {

select {

case job := -Queue:

job.payload.UploadToS3() // -- STILL NOT GOOD

}

}

}

说实话,我不知道我们在想什么。这肯定是一个满是Red-Bulls的夜晚。这个方法不会带来什么改善,我们用了一个 有缺陷的缓冲队列并发,仅仅是把问题推迟了。我们的同步处理器同时仅仅会上传一个数据到S3,因为来到的请求远远大于单核处理器上传到S3的能力,我们的带缓冲channel很快达到了它的极限,然后阻塞了请求处理逻辑的queue更多item的能力。

我们仅仅避免了问题,同时开始了我们的系统挂掉的倒计时。当部署了这个有缺陷的版本后,我们的延时保持在每分钟以常量增长。

最好的解决方案

我们讨论过在使用用Go channel时利用一种常用的模式,来创建一个二级channel系统,一个来queue job,另外一个来控制使用多少个worker来并发操作JobQueue。

想法是,以一个恒定速率并行上传到S3,既不会导致机器崩溃也不好产生S3的连接错误。这样我们选择了创建一个Job/Worker模式。对于那些熟悉Java、C#等语言的开发者,可以把这种模式想象成利用channel以golang的方式来实现了一个worker线程池,作为一种替代。

var (

MaxWorker = os.Getenv("MAX_WORKERS")

MaxQueue = os.Getenv("MAX_QUEUE")

)

// Job represents the job to be run

type Job struct {

Payload Payload

}

// A buffered channel that we can send work requests on.

var JobQueue chan Job

// Worker represents the worker that executes the job

type Worker struct {

WorkerPool chan chan Job

JobChannel chan Job

quit chan bool

}

func NewWorker(workerPool chan chan Job) Worker {

return Worker{

WorkerPool: workerPool,

JobChannel: make(chan Job),

quit: make(chan bool)}

}

// Start method starts the run loop for the worker, listening for a quit channel in

// case we need to stop it

func (w Worker) Start() {

go func() {

for {

// register the current worker into the worker queue.

w.WorkerPool - w.JobChannel

select {

case job := -w.JobChannel:

// we have received a work request.

if err := job.Payload.UploadToS3(); err != nil {

log.Errorf("Error uploading to S3: %s", err.Error())

}

case -w.quit:

// we have received a signal to stop

return

}

}

}()

}

// Stop signals the worker to stop listening for work requests.

func (w Worker) Stop() {

go func() {

w.quit - true

}()

}

我们已经修改了我们的web请求handler,用payload创建一个Job实例,然后发到JobQueue channel,以便于worker来获取。

func payloadHandler(w http.ResponseWriter, r *http.Request) {

if r.Method != "POST" {

w.WriteHeader(http.StatusMethodNotAllowed)

return

}

// Read the body into a string for json decoding

var content = PayloadCollection{}

err := json.NewDecoder(io.LimitReader(r.Body, MaxLength)).Decode(content)

if err != nil {

w.Header().Set("Content-Type", "application/json; charset=UTF-8")

w.WriteHeader(http.StatusBadRequest)

return

}

// Go through each payload and queue items individually to be posted to S3

for _, payload := range content.Payloads {

// let's create a job with the payload

work := Job{Payload: payload}

// Push the work onto the queue.

JobQueue - work

}

w.WriteHeader(http.StatusOK)

}

在web server初始化时,我们创建一个Dispatcher,然后调用Run()函数创建一个worker池子,然后开始监听JobQueue中的job。

dispatcher := NewDispatcher(MaxWorker)

dispatcher.Run()

下面是dispatcher的实现代码:

type Dispatcher struct {

// A pool of workers channels that are registered with the dispatcher

WorkerPool chan chan Job

}

func NewDispatcher(maxWorkers int) *Dispatcher {

pool := make(chan chan Job, maxWorkers)

return Dispatcher{WorkerPool: pool}

}

func (d *Dispatcher) Run() {

// starting n number of workers

for i := 0; i d.maxWorkers; i++ {

worker := NewWorker(d.pool)

worker.Start()

}

go d.dispatch()

}

func (d *Dispatcher) dispatch() {

for {

select {

case job := -JobQueue:

// a job request has been received

go func(job Job) {

// try to obtain a worker job channel that is available.

// this will block until a worker is idle

jobChannel := -d.WorkerPool

// dispatch the job to the worker job channel

jobChannel - job

}(job)

}

}

}

注意到,我们提供了初始化并加入到池子的worker的最大数量。因为这个工程我们利用了Amazon Elasticbeanstalk带有的docker化的Go环境,所以我们常常会遵守12-factor方法论来配置我们的生成环境中的系统,我们从环境变了读取这些值。这种方式,我们控制worker的数量和JobQueue的大小,所以我们可以很快的改变这些值,而不需要重新部署集群。

var (

MaxWorker = os.Getenv("MAX_WORKERS")

MaxQueue = os.Getenv("MAX_QUEUE")

)

直接结果

我们部署了之后,立马看到了延时降到微乎其微的数值,并未我们处理请求的能力提升很大。

Elastic Load Balancers完全启动后,我们看到ElasticBeanstalk 应用服务于每分钟1百万请求。通常情况下在上午时间有几个小时,流量峰值超过每分钟一百万次。

我们一旦部署了新的代码,服务器的数量从100台大幅 下降到大约20台。

我们合理配置了我们的集群和自动均衡配置之后,我们可以把服务器的数量降至4x EC2 c4.Large实例,并且Elastic Auto-Scaling设置为如果CPU达到5分钟的90%利用率,我们就会产生新的实例。

总结

在我的书中,简单总是获胜。我们可以使用多队列、后台worker、复杂的部署设计一个复杂的系统,但是我们决定利用Elasticbeanstalk 的auto-scaling的能力和Go语言开箱即用的特性简化并发。

我们仅仅用了4台机器,这并不是什么新鲜事了。可能它们还不如我的MacBook能力强大,但是却处理了每分钟1百万的写入到S3的请求。

处理问题有正确的工具。当你的 Ruby on Rails 系统需要更强大的web handler时,可以考虑下ruby生态系统之外的技术,或许可以得到更简单但更强大的替代方案。

一篇文章带你读懂 io_uring 的接口与实现

io_uring 是 Linux 提供的一个异步 I/O 接口。io_uring 在 2019 年加入 Linux 内核,经过了两年的发展,现在已经变得非常强大。本文基于 Linux 5.12.10 介绍 io_uring 接口。

io_uring 的实现主要在 fs/io_uring.c 中。

io_uring 的实现仅仅使用了三个 syscall:io_uring_setup, io_uring_enter 和 io_uring_register。它们分别用于设置 io_uring 上下文,提交并获取完成任务,以及注册内核用户共享的缓冲区。使用前两个 syscall 已经足够使用 io_uring 接口了。

用户和内核通过提交队列和完成队列进行任务的提交和收割。后文中会出现大量的简写,在这里先做一些介绍。

用户通过调用 io_uring_setup 1 初始化一个新的 io_uring 上下文。该函数返回一个 file descriptor,并将 io_uring 支持的功能、以及各个数据结构在 fd 中的偏移量存入 params。用户根据偏移量将 fd 映射到内存 (mmap) 后即可获得一块内核用户共享的内存区域。这块内存区域中,有 io_uring 的上下文信息:提交队列信息 (SQ_RING) 和完成队列信息 (CQ_RING);还有一块专门用来存放提交队列元素的区域 (SQEs)。SQ_RING 中只存储 SQE 在 SQEs 区域中的序号,CQ_RING 存储完整的任务完成数据。2

在 Linux 5.12 中,SQE 大小为 64B,CQE 大小为 16B。因此,相同数量的 SQE 和 CQE 所需要的空间不一样。初始化 io_uring 时,用户如果不在 params 中设置 CQ 长度,内核会分配 entries 个 SQE,以及 entries * 2 个 CQE。

io_uring_setup 设计的巧妙之处在于,内核通过一块和用户共享的内存区域进行消息的传递。在创建上下文后,任务提交、任务收割等操作都通过这块共享的内存区域进行,在 IO_SQPOLL 模式下(后文将详细介绍),可以完全绕过 Linux 的 syscall 机制完成需要内核介入的操作(比如读写文件),大大减少了 syscall 切换上下文、刷 TLB 的开销。

io_uring 可以处理多种 I/O 相关的请求。比如:

下面以 fsync 为例,介绍执行这个操作中可能用到的结构体和函数。

io_op_def io_op_defs[] 数组中定义了 io_uring 支持的操作,以及它在 io_uring 中的一些参数。3 比如 IORING_OP_FSYNC:

io_uring 中几乎每个操作都有对应的准备和执行函数。比如 fsync 操作就对应 io_fsync_prep 和 io_fsync函数。

除了 fsync 这种同步(阻塞)操作,内核中还支持一些异步(非阻塞)调用的操作,比如 Direct I/O 模式下的文件读写。对于这些操作,io_uring 中还会有一个对应的异步准备函数,以 _async 结尾。比如:

这些函数就是 io_uring 对某个 I/O 操作的包装。

用户将需要进行的操作写入 io_uring 的 SQ 中。在 CQ 中,用户可以收割任务的完成情况。这里,我们介绍 SQE 和 CQE 的编码。

include/uapi/linux/io_uring.h 4 中定义了 SQE 和 CQE。SQE 是一个 64B 大小的结构体,里面包含了所有操作可能用到的信息。

io_uring_sqe的定义

CQE 是一个 16B 大小的结构体,包含操作的执行结果。

继续以 fsync 为例。要在 io_uring 中完成 fsync 操作,用户需要将 SQE 中的 opcode 设置为 IORING_OP_FSYNC,将 fd 设置为需要同步的文件,并填充 fsync_flags。其他操作也是类似,设置 opcode 并将操作所需要的参数并写入 SQE 即可。

通常来说,使用 io_uring 的程序都需要用到 64 位的 user_data 来唯一标识一个操作 5。user_data 是 SQE 的一部分。io_uring 执行完某个操作后,会将这个操作的 user_data 和操作的返回值一起写入 CQ 中。

相关视频推荐

io_uring 新起之秀的linux io模式,是如何媲美epoll的

网络原理tcp/udp,网络编程epoll/reactor,面试中正经“八股文”

学习地址:

需要C/C++ Linux服务器架构师学习资料加qun812855908获取(资料包括 C/C++,Linux,golang技术,Nginx,ZeroMQ,MySQL,Redis,fastdfs,MongoDB,ZK,流媒体,CDN,P2P,K8S,Docker,TCP/IP,协程,DPDK,ffmpeg 等),免费分享

io_uring 通过环形队列和用户交互。

我们的先以用户提交任务为例,介绍 io_uring 的内核用户交互方式。用户提交任务的过程如下:

接下来我们简要介绍内核获取任务、内核完成任务、用户收割任务的过程。

介绍完 io_uring 的用户态接口后,我们就可以详细介绍 io_uring 在内核中是如何实现的了。

io_uring 在创建时有两个选项,对应着 io_uring 处理任务的不同方式:

这些选项的设定会影响之后用户与 io_uring 交互的方式:

每个 io_uring 都由一个轻量级的 io-wq6 线程池支持,从而实现 Buffered I/O 的异步执行。对于 Buffered I/O 来说,文件的内容可能在 page cache 里,也可能需要从盘上读取。如果文件的内容已经在 page cache 中,这些内容可以直接在 io_uring_enter 的时候读取到,并在返回用户态时收割。否则,读写操作会在 workqueue 里执行。

如果没有在创建 io_uring 时指定 IORING_SETUP_IOPOLL 选项,io_uring 的操作就会放进 io-wq 中执行。

上图覆盖了关闭 IOPOLL 模式下,用户通过 io_uring 执行操作的整个调用流程。用户提交的 SQE 经过一系列处理后,会在 io_queue_sqe 中试探着执行一次。

所有的操作都被提交到内核队列后,如果用户设置了 IORING_ENTER_GETEVENTS flag,io_uring_enter 在返回用户态前会等待指定个数的操作完成。

之后,Linux 随时会调度 io-wq 的内核线程执行。此时,io_wq_submit_work 函数会不断用阻塞模式执行用户指定的操作。某个操作完整执行后,它的返回值就会被写入 CQ 中。用户通过 io_uring 上下文中的 CQ 队尾位置就能知道内核处理好了哪些操作,无需再次调用 io_uring_enter。

通过火焰图可以观察到,在关闭 IOPOLL 时,内核会花大量时间处理读取操作。

创建 io_uring 时指定 IORING_SETUP_IOPOLL 选项即可开启 I/O 轮询模式。通常来说,用 O_DIRECT 模式打开的文件支持使用轮询模式读写内容,执行 read / write 操作。

在轮询模式下,io_uring_enter 只负责把操作提交到内核的文件读写队列中。之后,用户需要多次调用 io_uring_enter 来轮询操作是否完成。

在轮询模式下,io-wq 不会被使用。提交任务时,io_read 直接调用内核的 Direct I/O 接口向设备队列提交任务。

如果用户设置了 IORING_ENTER_GETEVENTS flag,在返回用户态前,io_uring_enter 会通过 io_iopoll_check 调用内核接口轮询任务是否完成。

通过火焰图可以看到,io_uring_enter 在提交任务这一块只花了一小部分时间。大部分时间都在轮询 I/O 操作是否完成。

在实际生产环境中,我们往往会有这样的需求:往文件中写入 n 次,然后用 fsync 落盘。在使用 io_uring时,SQ 中的任务不一定会按顺序执行。给操作设定 IO_SQE_LINK 选项,就可以建立任务之间的先后关系。IO_SQE_LINK 之后的第一个任务一定在当前任务完成后执行。7

io_uring 内部使用链表来管理任务的依赖关系。每一个操作在经过 io_submit_sqe 的处理后,都会变成一个 io_kiocb 对象。这个对象有可能会被放入链表中。io_submit_sqe 8 会对含有 IO_SQE_LINK 的 SQE 作特殊处理,处理过程如下:

由此看来,SQ 中连续的 IO_SQE_LINK 记录会按先后关系依次处理。在 io_submit_sqes 结束前,所有的任务都会被提交。因此,如果任务有先后关系,它们必须在同一个 io_uring_enter syscall 中批量提交。

其他用于控制 io_uring 任务依赖的选项包括 IOSQE_IO_DRAIN 和 IOSQE_IO_HARDLINK,这里不再展开。