重庆分公司,新征程启航
为企业提供网站建设、域名注册、服务器等服务
Goroutine调度是一个很复杂的机制,下面尝试用简单的语言描述一下Goroutine调度机制,想要对其有更深入的了解可以去研读一下源码。
成都创新互联服务项目包括滨城网站建设、滨城网站制作、滨城网页制作以及滨城网络营销策划等。多年来,我们专注于互联网行业,利用自身积累的技术优势、行业经验、深度合作伙伴关系等,向广大中小型企业、政府机构等提供互联网行业的解决方案,滨城网站推广取得了明显的社会效益与经济效益。目前,我们服务的客户以成都为中心已经辐射到滨城省份的部分城市,未来相信会继续扩大服务区域并继续获得客户的支持与信任!
首先介绍一下GMP什么意思:
G ----------- goroutine: 即Go协程,每个go关键字都会创建一个协程。
M ---------- thread内核级线程,所有的G都要放在M上才能运行。
P ----------- processor处理器,调度G到M上,其维护了一个队列,存储了所有需要它来调度的G。
Goroutine 调度器P和 OS 调度器是通过 M 结合起来的,每个 M 都代表了 1 个内核线程,OS 调度器负责把内核线程分配到 CPU 的核上执行
模型图:
避免频繁的创建、销毁线程,而是对线程的复用。
1)work stealing机制
当本线程无可运行的G时,尝试从其他线程绑定的P偷取G,而不是销毁线程。
2)hand off机制
当本线程M0因为G0进行系统调用阻塞时,线程释放绑定的P,把P转移给其他空闲的线程执行。进而某个空闲的M1获取P,继续执行P队列中剩下的G。而M0由于陷入系统调用而进被阻塞,M1接替M0的工作,只要P不空闲,就可以保证充分利用CPU。M1的来源有可能是M的缓存池,也可能是新建的。当G0系统调用结束后,根据M0是否能获取到P,将会将G0做不同的处理:
如果有空闲的P,则获取一个P,继续执行G0。
如果没有空闲的P,则将G0放入全局队列,等待被其他的P调度。然后M0将进入缓存池睡眠。
如下图
GOMAXPROCS设置P的数量,最多有GOMAXPROCS个线程分布在多个CPU上同时运行
在Go中一个goroutine最多占用CPU 10ms,防止其他goroutine被饿死。
具体可以去看另一篇文章
【Golang详解】go语言调度机制 抢占式调度
当创建一个新的G之后优先加入本地队列,如果本地队列满了,会将本地队列的G移动到全局队列里面,当M执行work stealing从其他P偷不到G时,它可以从全局G队列获取G。
协程经历过程
我们创建一个协程 go func()经历过程如下图:
说明:
这里有两个存储G的队列,一个是局部调度器P的本地队列、一个是全局G队列。新创建的G会先保存在P的本地队列中,如果P的本地队列已经满了就会保存在全局的队列中;处理器本地队列是一个使用数组构成的环形链表,它最多可以存储 256 个待执行任务。
G只能运行在M中,一个M必须持有一个P,M与P是1:1的关系。M会从P的本地队列弹出一个可执行状态的G来执行,如果P的本地队列为空,就会想其他的MP组合偷取一个可执行的G来执行;
一个M调度G执行的过程是一个循环机制;会一直从本地队列或全局队列中获取G
上面说到P的个数默认等于CPU核数,每个M必须持有一个P才可以执行G,一般情况下M的个数会略大于P的个数,这多出来的M将会在G产生系统调用时发挥作用。类似线程池,Go也提供一个M的池子,需要时从池子中获取,用完放回池子,不够用时就再创建一个。
work-stealing调度算法:当M执行完了当前P的本地队列队列里的所有G后,P也不会就这么在那躺尸啥都不干,它会先尝试从全局队列队列寻找G来执行,如果全局队列为空,它会随机挑选另外一个P,从它的队列里中拿走一半的G到自己的队列中执行。
如果一切正常,调度器会以上述的那种方式顺畅地运行,但这个世界没这么美好,总有意外发生,以下分析goroutine在两种例外情况下的行为。
Go runtime会在下面的goroutine被阻塞的情况下运行另外一个goroutine:
用户态阻塞/唤醒
当goroutine因为channel操作或者network I/O而阻塞时(实际上golang已经用netpoller实现了goroutine网络I/O阻塞不会导致M被阻塞,仅阻塞G,这里仅仅是举个栗子),对应的G会被放置到某个wait队列(如channel的waitq),该G的状态由_Gruning变为_Gwaitting,而M会跳过该G尝试获取并执行下一个G,如果此时没有可运行的G供M运行,那么M将解绑P,并进入sleep状态;当阻塞的G被另一端的G2唤醒时(比如channel的可读/写通知),G被标记为,尝试加入G2所在P的runnext(runnext是线程下一个需要执行的 Goroutine。), 然后再是P的本地队列和全局队列。
系统调用阻塞
当M执行某一个G时候如果发生了阻塞操作,M会阻塞,如果当前有一些G在执行,调度器会把这个线程M从P中摘除,然后再创建一个新的操作系统的线程(如果有空闲的线程可用就复用空闲线程)来服务于这个P。当M系统调用结束时候,这个G会尝试获取一个空闲的P执行,并放入到这个P的本地队列。如果获取不到P,那么这个线程M变成休眠状态, 加入到空闲线程中,然后这个G会被放入全局队列中。
队列轮转
可见每个P维护着一个包含G的队列,不考虑G进入系统调用或IO操作的情况下,P周期性的将G调度到M中执行,执行一小段时间,将上下文保存下来,然后将G放到队列尾部,然后从队列中重新取出一个G进行调度。
除了每个P维护的G队列以外,还有一个全局的队列,每个P会周期性地查看全局队列中是否有G待运行并将其调度到M中执行,全局队列中G的来源,主要有从系统调用中恢复的G。之所以P会周期性地查看全局队列,也是为了防止全局队列中的G被饿死。
除了每个P维护的G队列以外,还有一个全局的队列,每个P会周期性地查看全局队列中是否有G待运行并将其调度到M中执行,全局队列中G的来源,主要有从系统调用中恢复的G。之所以P会周期性地查看全局队列,也是为了防止全局队列中的G被饿死。
M0
M0是启动程序后的编号为0的主线程,这个M对应的实例会在全局变量rutime.m0中,不需要在heap上分配,M0负责执行初始化操作和启动第一个G,在之后M0就和其他的M一样了
G0
G0是每次启动一个M都会第一个创建的goroutine,G0仅用于负责调度G,G0不指向任何可执行的函数,每个M都会有一个自己的G0,在调度或系统调用时会使用G0的栈空间,全局变量的G0是M0的G0
一个G由于调度被中断,此后如何恢复?
中断的时候将寄存器里的栈信息,保存到自己的G对象里面。当再次轮到自己执行时,将自己保存的栈信息复制到寄存器里面,这样就接着上次之后运行了。
我这里只是根据自己的理解进行了简单的介绍,想要详细了解有关GMP的底层原理可以去看Go调度器 G-P-M 模型的设计者的文档或直接看源码
参考: ()
()
可以去DELVE官网进行下载。
关于delve工具的介绍,这里简单给大家介绍一下。
delve在go项目及应用的开发中可以用来追踪程序中的异常代码,也可以通过打日志的方式追查问题,但是更重要也是非常厉害的一点,就是delve可以直接分析程序执行的情况。这一点在后期或线上的问题排查中无疑是提供了一个非常大的便捷。
Go(又称 Golang)是 Google 的 Robert Griesemer,Rob Pike 及 Ken Thompson 开发的一种静态强类型、编译型语言。
Go 语言语法与 C 相近,但功能上有:内存安全,GC(垃圾回收),结构形态及 CSP-style 并发计算。
Go的语法接近C语言,但对于变量的声明有所不同。Go支持垃圾回收功能。Go的并行模型是以东尼·霍尔的通信顺序进程(CSP)为基础。
采取类似模型的其他语言包括Occam和Limbo,但它也具有Pi运算的特征,比如通道传输。在1.8版本中开放插件(Plugin)的支持,这意味着现在能从Go中动态加载部分函数。
Delve常用命令
命令功能:
dlv attach后面跟 pid,用来Debug编译好的Golang程序。
dlv core用于 coredump。
dlv debug后面跟要调试的 go 文件,进入 Debug。
dlv testDebug test 函数。
正在做的是绑定要处理的完整路径。/location/{titanrolex}GetUser。您真正想要的是绑定/location/以由一个处理程序处理(例如LocationHandler)。
您可以使用标准库或其他路由器来做到这一点。我将介绍两种方式:
标准库:
import(
fmt
net/http
log
)
funclocationHandler(whttp.ResponseWriter,r*http.Request){
name:=r.URL.Path[len(/location/):]
fmt.Fprintf(w,Location:%s\n,name)
}
funcmain(){
http.HandleFunc(/location/,locationHandler)
log.Fatal(http.ListenAndServe(:8080,nil))
}
但是请注意,/location///以这种方式实现更复杂的路径(例如)会很乏味。
另一种方法是使用github.com/julienschmidt/httprouter,特别是如果您更频繁地遇到这些情况(并且路径更复杂)。
以下是您的用例的示例:
import(
fmt
github.com/julienschmidt/httprouter
net/http
log
)
funcLocationHandler(whttp.ResponseWriter,r*http.Request,pshttprouter.Params){
fmt.Fprintf(w,Location:%s\n,ps.ByName(loc))
}
funcmain(){
router:=httprouter.New()
router.GET(/location/:loc,LocationHandler)
log.Fatal(http.ListenAndServe(:8080,router))
}
请注意,httprouter对处理程序使用稍微不同的签名。这是因为,如您所见,它还将这些参数传递给函数。
哦,还有一个注意事项,你可以直接用你的浏览器(或其他东西)点击-如果其他东西足够好,它会将URLEncode编码为.
我们在mian函数中,首先初始化配置文件,然后新建http连接。
这个连接创建之后,监听服务器的9999端口。如果url的路径后缀为 "/ws",就转发到ws/ws.go中的IndexHandler方法中。
这个方法中首先我们创建一个websocket的Upgrader实例,然后我们使用Upgrader的upgrade方法来升级一下我们的连接为长连接。
升级完成之后会返回一个*websocket.Conn的连接,我们之后所有的关于连接的操作,都是基于该conn的。
在该连接完成之后,我们将连接存放到一个名为Client的map中,以便之后管理更为方便。
之后,我们启动一个goroutine来读取连接中发送的信息内容,再根据内容进行相应的操作。
三次握手:
1. 主动发起连接请求端(客户端),发送 SYN 标志位,携带数据包、包号
2. 被动接收连接请求端(服务器),接收 SYN,回复 ACK,携带应答序列号。同时,发送SYN标志位,携带数据包、包号
3. 主动发起连接请求端(客户端),接收SYN 标志位,回复 ACK。
被动端(服务器)接收 ACK —— 标志着 三次握手建立完成( Accept()/Dial() 返回 )
四次挥手:
1. 主动请求断开连接端(客户端), 发送 FIN标志,携带数据包
2. 被动接受断开连接端(服务器), 发送 ACK标志,携带应答序列号。 —— 半关闭完成。
3. 被动接受断开连接端(服务器), 发送 FIN标志,携带数据包
4. 主动请求断开连接端(客户端), 发送 最后一个 ACK标志,携带应答序列号。—— 发送完成,客户端不会直接退出,等 2MSL时长。
等 2MSL待目的:确保服务器 收到最后一个ACK
滑动窗口:
通知对端本地存储数据的 缓冲区容量。—— write 函数在对端 缓冲区满时,有可能阻塞。
TCP状态转换:
1. 主动发起连接请求端:
CLOSED —— 发送SYN —— SYN_SENT(了解) —— 接收ACK、SYN,回发 ACK —— ESTABLISHED (数据通信)
2. 主动关闭连接请求端:
ESTABLISHED —— 发送FIN —— FIN_WAIT_1 —— 接收ACK —— FIN_WAIT_2 (半关闭、主动端)
—— 接收FIN、回复ACK —— TIME_WAIT (主动端) —— 等 2MSL 时长 —— CLOSED
3. 被动建立连接请求端:
CLOSED —— LISTEN —— 接收SYN、发送ACK、SYN —— SYN_RCVD —— 接收 ACK —— ESTABLISHED (数据通信)
4. 被动断开连接请求端:
ESTABLISHED —— 接收 FIN、发送 ACK —— CLOSE_WAIT —— 发送 FIN —— LAST_ACK —— 接收ACK —— CLOSED
windows下查看TCP状态转换:
netstat -an | findstr 端口号
Linux下查看TCP状态转换:
netstat -an | grep 端口号
TCP和UDP对比:
TCP: 面向连接的可靠的数据包传递。 针对不稳定的 网络层,完全弥补。ACK
UDP:无连接不可靠的报文传输。 针对不稳定的 网络层,完全不弥补。还原网络真实状态。
优点 缺点
TCP: 可靠、顺序、稳定 系统资源消耗大,程序实现繁复、速度慢
UDP:系统资源消耗小,程序实现简单、速度快 不可靠、无序、不稳定
使用场景:
TCP:大文件、可靠数据传输。 对数据的 稳定性、准确性、一致性要求较高的场合。
UDP:应用于对数据时效性要求较高的场合。 网络直播、电话会议、视频直播、网络游戏。
UDP-CS-Server实现流程:
1. 创建 udp地址结构 ResolveUDPAddr(“协议”, “IP:port”) —— udpAddr 本质 struct{IP、port}
2. 创建用于 数据通信的 socket ListenUDP(“协议”, udpAddr ) —— udpConn (socket)
3. 从客户端读取数据,获取对端的地址 udpConn.ReadFromUDP() —— 返回:n,clientAddr, err
4. 发送数据包给 客户端 udpConn.WriteToUDP("数据", clientAddr)
UDP-CS-Client实现流程:
1. 创建用于通信的 socket。 net.Dial("udp", "服务器IP:port") —— udpConn (socket)
2. 以后流程参见 TCP客户端实现源码。
UDPserver默认就支持并发!
------------------------------------
命令行参数: 在main函数启动时,向整个程序传参。 【重点】
语法: go run xxx.go argv1 argv2 argv3 argv4 。。。
xxx.exe: 第 0 个参数。
argv1 :第 1 个参数。
argv2 :第 2 个参数。
argv3 :第 3 个参数。
argv4 :第 4 个参数。
使用: list := os.Args 提取所有命令行参数。
获取文件属性函数:
os.stat(文件访问绝对路径) —— fileInfo 接口
fileInfo 包含 两个接口。
Name() 获取文件名。 不带访问路径
Size() 获取文件大小。
网络文件传输 —— 发送端(客户端)
1. 获取命令行参数,得到文件名(带路径)filePath list := os.Args
2. 使用 os.stat() 获取 文件名(不带路径)fileName
3. 创建 用于数据传输的 socket net.Dial("tcp", “服务器IP+port”) —— conn
4. 发送文件名(不带路径) 给接收端, conn.write()
5. 读取 接收端回发“ok”,判断无误。封装函数 sendFile(filePath, conn) 发送文件内容
6. 实现 sendFile(filePath, conn)
1) 只读打开文件 os.Open(filePath)
for {
2) 从文件中读数据 f.Read(buf)
3) 将读到的数据写到socket中 conn.write(buf[:n])
4)判断读取文件的 结尾。 io.EOF. 跳出循环
}
网络文件传输 —— 接收端(服务器)
1. 创建用于监听的 socket net.Listen() —— listener
2. 借助listener 创建用于 通信的 socket listener.Accpet() —— conn
3. 读取 conn.read() 发送端的 文件名, 保存至本地。
4. 回发 “ok”应答 发送端。
5. 封装函数,接收文件内容 recvFile(文件路径)
1) f = os.Create(带有路径的文件名)
for {
2)从 socket中读取发送端发送的 文件内容 。 conn.read(buf)
3) 将读到的数据 保存至本地文件 f.Write(buf[:n])
4) 判断 读取conn 结束, 代表文件传输完成。 n == 0 break
}
近期正在探索前端、后端、系统端各类常用组件与工具,对其一些常见的组件进行再次整理一下,形成标准化组件专题,后续该专题将包含各类语言中的一些常用组件。欢迎大家进行持续关注。
本节我们分享的是基于Golang实现的高性能和弹性的流处理器 benthos ,它能够以各种代理模式连接各种 源 和 接收器,并对有效负载执行 水合、浓缩、转换和过滤 。
它带有 强大的映射语言 ,易于部署和监控,并且可以作为静态二进制文件、docker 映像或 无服务器函数 放入您的管道,使其成为云原生。
Benthos 是完全声明性的,流管道在单个配置文件中定义,允许您指定连接器和处理阶段列表:
Apache Pulsar, AWS (DynamoDB, Kinesis, S3, SQS, SNS), Azure (Blob storage, Queue storage, Table storage), Cassandra, Elasticsearch, File, GCP (Pub/Sub, Cloud storage), HDFS, HTTP (server and client, including websockets), Kafka, Memcached, MQTT, Nanomsg, NATS, NATS JetStream, NATS Streaming, NSQ, AMQP 0.91 (RabbitMQ), AMQP 1, Redis (streams, list, pubsub, hashes), MongoDB, SQL (MySQL, PostgreSQL, Clickhouse, MSSQL), Stdin/Stdout, TCP UDP, sockets and ZMQ4.
1、docker安装
具体使用方式可以参见该 文档
有关如何配置更高级的流处理概念(例如流连接、扩充工作流等)的指导,请查看 说明书部分。
有关在 Go 中构建您自己的自定义插件的指导,请查看 公共 API。