您当前的位置:首页 > 电脑百科 > 程序开发 > 语言 > Go语言

用Go语言写三种实用队列

时间:2023-03-28 14:57:35  来源:今日头条  作者:研道鸠摩智

背景

我们在使用kube.NETes的客户端k8s.io/client-go 进行开发的时候,比如写个CRD的operator, 经常会用到队列这种数据结构。并且很多时候,我们在做服务器端后台开发的时候,需要用到任务队列,进行任务的异步处理与任务管理。k8s.io/client-go中的workqueue包里面提供了三种常用的队列。今天给大家演示下三种队列的使用方法与相应的使用场景,大家在工作中可以直接copy这些代码,加速自己项目的开发。这三个队列的关系如下图所示:

k8s队列关系

队列

type (基础队列)

下面给出了数据结构,其中dirty,processing两个集合分别存储的是需要处理的任务和正在处理的任务,queue[]t按序存放的是所有添加的任务。这三个属性的关系很有意思,dirty用于快速判断queue中是否存在相应的任务,这样有以下两个用处:

1. 在Add的时候,可以防止重复添加。(代码查看:​https://Github.com/kubernetes/client-go/blob/master/util/workqueue/queue.go#L120​)。

2.由于在任务完成后要调用Done方法,把任务从processing集合中删除掉,那么如果在完成前(即调用Done方法之前),把任务再次添加进dirty集合,那么在完成调用Done方法的时候,会再次把任务重新添加进queue队列,进行处理(代码查看:https://github.com/kubernetes/client-go/blob/master/util/workqueue/queue.go#L180)。

而processing集合存放的是当前正在执行的任务,它的作用有以下几点。

1.在Add的时候,如果任务正在处理,就直接返回。这样在任务调用Done的时候,由于dirty集合中有,会把这个任务再次放在队列的尾部。(代码查看:https://github.com/kubernetes/client-go/blob/master/util/workqueue/queue.go#L120)。

2.用于判断队列中是否还有任务正在执行,这样在shutdown的时候,可以有的放矢。(代码查看:https://github.com/kubernetes/client-go/blob/master/util/workqueue/queue.go#L221)。

整个queue队列工作模式是,你的工作线程通过Get方法从队列中获取任务(如果队列长度为0,需要q.cond.WAIt()),然后处理任务(你自己的业务逻辑),处理完后调用Done方法,表明任务完成了,同时调用q.cond.Signal(),唤醒等待的工作线程​。

type Type struct {
// queue defines the order in which we will work on items. Every
// element of queue should be in the dirty set and not in the
// processing set.
queue []t

// dirty defines all of the items that need to be processed.
dirty set

// Things that are currently being processed are in the processing set.
// These things may be simultaneously in the dirty set. When we finish
// processing something and remove it from this set, we'll check if
// it's in the dirty set, and if so, add it to the queue.
processing set

cond *sync.Cond

shuttingDown bool
drain bool

metrics queueMetrics

unfinishedWorkUpdatePeriod time.Duration
clock clock.WithTicker
}

delaying_queue(延迟队列)

这个延迟队列继承了上面的基础队列,同时提供了addAfter函数,实现根据延迟时间把元素增加进延迟队列。其中的waitForPriorityQueue实现了一个用于waitFor元素的优先级队列,其实就是一个最小堆。

func (q *delayingType) AddAfter(item interface{}, duration time.Duration)这个函数(代码https://github.com/kubernetes/client-go/blob/master/util/workqueue/delaying_queue.go#L162)。

当duration为0,就直接通过q.add放到它继承的基础执行队列里面,如果有延迟值,就放在q.waitingForAddCh通道里面,等待readyAt时机成熟,再放到队列中。那这个通道里面的元素当readyAt后,如何加入到基础执行队列?下面的截图给出了答案,便是启动的ret.waitingLoop协程。这个方法的具体代码(https://github.com/kubernetes/client-go/blob/master/util/workqueue/delaying_queue.go#L189),具体思路就是利用了上面的waitForPriorityQueue最小堆,还有等待加入队列通道q.waitingForAddCh,大家可以看看给出的具体代码,大致的思想就会了解。

创建延迟队列

// delayingType wraps an Interface and provides delayed re-enquing
type delayingType struct {
Interface

// clock tracks time for delayed firing
clock clock.Clock

// stopCh lets us signal a shutdown to the waiting loop
stopCh chan struct{}
// stopOnce guarantees we only signal shutdown a single time
stopOnce sync.Once

// heartbeat ensures we wait no more than maxWait before firing
heartbeat clock.Ticker

// waitingForAddCh is a buffered channel that feeds waitingForAdd
waitingForAddCh chan *waitFor

// metrics counts the number of retries
metrics retryMetrics
}
// waitFor holds the data to add and the time it should be added
type waitFor struct {
data t
readyAt time.Time
// index in the priority queue (heap)
index int
}
type waitForPriorityQueue []*waitFor

元素添加逻辑

下面是测试代码,大家可以看看如何创建延迟队列,还有添加任务。

下面的代码,在延迟队列里面增加了一个字符串"foo",延迟执行的时间是50毫秒。然后差不多50毫秒后,延迟队列长度为0
fakeClock := testingclock.NewFakeClock(time.Now())
q := NewDelayingQueueWithCustomClock(fakeClock, "")

first := "foo"

q.AddAfter(first, 50*time.Millisecond)
if err := waitForWaitingQueueToFill(q); err != nil {
t.Fatalf("unexpected err: %v", err)
}

if q.Len() != 0 {
t.Errorf("should not have added")
}

fakeClock.Step(60 * time.Millisecond)

if err := waitForAdded(q, 1); err != nil {
t.Errorf("should have added")
}
item, _ := q.Get()
q.Done(item)

// step past the next heartbeat
fakeClock.Step(10 * time.Second)

err := wait.Poll(1*time.Millisecond, 30*time.Millisecond, func() (done bool, err error) {
if q.Len() > 0 {
return false, fmt.Errorf("added to queue")
}

return false, nil
})
if err != wait.ErrWaitTimeout {
t.Errorf("expected timeout, got: %v", err)
}

if q.Len() != 0 {
t.Errorf("should not have added")
}

func waitForAdded(q DelayingInterface, depth int) error {
return wait.Poll(1*time.Millisecond, 10*time.Second, func() (done bool, err error) {
if q.Len() == depth {
return true, nil
}

return false, nil
})
}

func waitForWaitingQueueToFill(q DelayingInterface) error {
return wait.Poll(1*time.Millisecond, 10*time.Second, func() (done bool, err error) {
if len(q.(*delayingType).waitingForAddCh) == 0 {
return true, nil
}

return false, nil
})
}

rate_limiting_queue(限速队列)​

限速队列是利用延迟队列的延迟特性,延迟某个元素的插入FIFO队列的时间,达到限速的目的

workqueue包下面的rateLimiter有多种,下面的代码显示的是
ItemExponentialFailureRateLimiter(排队指数算法)。

type ItemExponentialFailureRateLimiter struct {
failuresLock sync.Mutex
failures map[interface{}]int

baseDelay time.Duration
maxDelay time.Duration
}

它有个基础延迟时间,加入到延迟队列后,被执行的延迟时间的计算公式是如下所示。另外它还有个最大延迟时间的参数。

func (r *ItemExponentialFailureRateLimiter) When(item interface{}) time.Duration {
r.failuresLock.Lock()
defer r.failuresLock.Unlock()

exp := r.failures[item]
r.failures[item] = r.failures[item] + 1

// The backoff is cApped such that 'calculated' value never overflows.
backoff := float64(r.baseDelay.Nanoseconds()) * math.Pow(2, float64(exp))
if backoff > math.MaxInt64 {
return r.maxDelay
}

calculated := time.Duration(backoff)
if calculated > r.maxDelay {
return r.maxDelay
}

return calculated
}
 
下面的测试代码,显示的是创建了一个1毫秒基础延迟,最大1秒的延迟队列。它在延迟队列中增加了
一个"one"字符串,由于是第一次添加,所以基于上面的公式它的延迟时间是1毫秒,再次增加"one"
后,它的延迟时间是2*1毫秒,即2毫秒,对于增加的字符串"two"也是一样,当我们调用forget
方法后ItemExponentialFailureRateLimiter中的计数器会重置,再次增加"one"字符串后,
它的延迟时间又变成了1毫秒

limiter := NewItemExponentialFailureRateLimiter(1*time.Millisecond, 1*time.Second)
queue := NewRateLimitingQueue(limiter).(*rateLimitingType)
fakeClock := testingclock.NewFakeClock(time.Now())
delayingQueue := &delayingType{
Interface: New(),
clock: fakeClock,
heartbeat: fakeClock.NewTicker(maxWait),
stopCh: make(chan struct{}),
waitingForAddCh: make(chan *waitFor, 1000),
metrics: newRetryMetrics(""),
}
queue.DelayingInterface = delayingQueue

queue.AddRateLimited("one")
waitEntry := <-delayingQueue.waitingForAddCh
if e, a := 1*time.Millisecond, waitEntry.readyAt.Sub(fakeClock.Now()); e != a {
t.Errorf("expected %v, got %v", e, a)
}
queue.AddRateLimited("one")
waitEntry = <-delayingQueue.waitingForAddCh
if e, a := 2*time.Millisecond, waitEntry.readyAt.Sub(fakeClock.Now()); e != a {
t.Errorf("expected %v, got %v", e, a)
}
if e, a := 2, queue.NumRequeues("one"); e != a {
t.Errorf("expected %v, got %v", e, a)
}

queue.AddRateLimited("two")
waitEntry = <-delayingQueue.waitingForAddCh
if e, a := 1*time.Millisecond, waitEntry.readyAt.Sub(fakeClock.Now()); e != a {
t.Errorf("expected %v, got %v", e, a)
}
queue.AddRateLimited("two")
waitEntry = <-delayingQueue.waitingForAddCh
if e, a := 2*time.Millisecond, waitEntry.readyAt.Sub(fakeClock.Now()); e != a {
t.Errorf("expected %v, got %v", e, a)
}

queue.Forget("one")
if e, a := 0, queue.NumRequeues("one"); e != a {
t.Errorf("expected %v, got %v", e, a)
}
queue.AddRateLimited("one")
waitEntry = <-delayingQueue.waitingForAddCh
if e, a := 1*time.Millisecond, waitEntry.readyAt.Sub(fakeClock.Now()); e != a {
t.Errorf("expected %v, got %v", e, a)
}

此外这个包下面还有ItemFastSlowRateLimiter,BucketRateLimiter等。具体的大家可以查看default_rate_limiters.go(代码:https://github.com/kubernetes/client-go/blob/master/util/workqueue/default_rate_limiters.go)。

应用场景

延迟队列场景:

1、订单延迟支付关闭

常见的打车软件都会有匹配司机,这个可以用延迟队列来实现;处理已提交订单超过30分钟未付款失效的订单,延迟队列可以很好的解决;又或者注册了超过30天的用户,发短信撩动等。

2、定时任务调度

比如使用DelayQueue保存当天将会执行的任务和执行时间,或是需要设置一个倒计时,倒计时结束后更新数据库中某个表状态

限速队列场景:

比如限制数据队列的写入速度。



Tags:Go语言   点击:()  评论:()
声明:本站部分内容及图片来自互联网,转载是出于传递更多信息之目的,内容观点仅代表作者本人,不构成投资建议。投资者据此操作,风险自担。如有任何标注错误或版权侵犯请与我们联系,我们将及时更正、删除。
▌相关推荐
宝藏级Go语言开源项目——教你自己动手开发互联网搜索引擎
DIYSearchEngine 是一个能够高速采集海量互联网数据的开源搜索引擎,采用 Go 语言开发。Github 地址:https://github.com/johnlui/DIYSearchEngine运行方法首先,给自己准备一杯...【详细内容】
2024-03-12  Search: Go语言  点击:(19)  评论:(0)  加入收藏
你是否想知道如何应对高并发?Go语言为你提供了答案!
并发编程是当前软件领域中不可忽视的一个关键概念。随着CPU等硬件的不断发展,我们都渴望让我们的程序运行速度更快、更快。而Go语言在语言层面天生支持并发,充分利用现代CPU的...【详细内容】
2023-12-29  Search: Go语言  点击:(110)  评论:(0)  加入收藏
Go语言实现GoF设计模式:适配器模式
简介适配器模式(Adapter)是最常用的结构型模式之一,在现实生活中,适配器模式也是处处可见,比如电源插头转换器,它可以让英式的插头工作在中式的插座上。GoF 对它的定义如下:Convert...【详细内容】
2023-12-12  Search: Go语言  点击:(208)  评论:(0)  加入收藏
Go语言字符串拼接方式与性能比较,分析过没?
在Go语言中,字符串拼接性能是相当高效的,主要原因有两点:一是字符串在Go中是不可变的(immutable),二是Go语言提供了strings.Builder类型来高效处理字符串拼接。1. 字符串是不可变...【详细内容】
2023-12-11  Search: Go语言  点击:(229)  评论:(0)  加入收藏
一篇学会AI与Go语言无缝对接
在当今应用开发领域,类似OpenAI API等生成式AI技术的蓬勃发展正在彻底改变着应用开发的格局。Python和JavaScript等语言已经拥有丰富的资源来支持这些技术,其中LangChain就是...【详细内容】
2023-12-04  Search: Go语言  点击:(142)  评论:(0)  加入收藏
20小时快速入门Go语言
Go语言是由Google开发的一种高效、简洁和并发性强的编程语言,其设计目标是使得程序员能够更加容易地创建可靠、高效的软件。尽管Go语言的语法相对其他编程语言来说可能更加陌...【详细内容】
2023-12-03  Search: Go语言  点击:(154)  评论:(0)  加入收藏
十个令人惊叹的Go语言技巧,让你的代码更加优雅
在开发生产项目的过程中,我注意到经常会发现自己在重复编写代码,使用某些技巧时没有意识到,直到后来回顾工作时才意识到。为了解决这个问题,我开发了一种解决方案,对我来说非常有...【详细内容】
2023-11-20  Search: Go语言  点击:(172)  评论:(0)  加入收藏
Go语言Context应用全攻略:异步编程利器
概述在 Go 语言中,Context(上下文)是一个非常重要的概念,特别是在处理请求时。允许在请求的整个生命周期内传递数据、控制请求的取消、处理超时等。本文将介绍 Go 语言中 Contex...【详细内容】
2023-11-06  Search: Go语言  点击:(304)  评论:(0)  加入收藏
Go语言高级特性:Context深入解读
概述在 Go 语言中,context(上下文)是一个非常重要的概念。它主要用于在多个 goroutine 之间传递请求特定任务的截止日期、取消信号以及其他请求范围的值。3. Context 的取消与...【详细内容】
2023-11-01  Search: Go语言  点击:(232)  评论:(0)  加入收藏
Go语言中如何实现JWT
什么JWTJWT(JSON Web Token)是一种开放标准(RFC 7519),定义了一种在各方之间安全传输信息的简洁方式。这些信息可以被验证和信任,因为它们是数字签名的。JWT由三部分组成,用.分隔。...【详细内容】
2023-09-11  Search: Go语言  点击:(250)  评论:(0)  加入收藏
▌简易百科推荐
宝藏级Go语言开源项目——教你自己动手开发互联网搜索引擎
DIYSearchEngine 是一个能够高速采集海量互联网数据的开源搜索引擎,采用 Go 语言开发。Github 地址:https://github.com/johnlui/DIYSearchEngine运行方法首先,给自己准备一杯...【详细内容】
2024-03-12  OSC开源社区    Tags:Go语言   点击:(19)  评论:(0)  加入收藏
Go Gin框架实现优雅地重启和停止
在Web应用程序中,有时候我们需要重启或停止服务器,无论是因为更新代码还是进行例行维护。在这种情景下,我们需要保证应用程序的可用性和数据的一致性。这就需要优雅地关闭和重...【详细内容】
2024-01-30  源自开发者  微信公众号  Tags:Go   点击:(67)  评论:(0)  加入收藏
如何让Go程序以后台进程或daemon方式运行
本文探讨了如何通过Go代码实现在后台运行的程序。最近我用Go语言开发了一个WebSocket服务,我希望它能在后台运行,并在异常退出时自动重新启动。我的整体思路是将程序转为后台...【详细内容】
2024-01-26  Go语言圈  微信公众号  Tags:Go程序   点击:(60)  评论:(0)  加入收藏
深入Go底层原理,重写Redis中间件实战
Go语言以其简洁、高效和并发性能而闻名,深入了解其底层原理可以帮助我们更好地利用其优势。在本文中,我们将探讨如何深入Go底层原理,以及如何利用这些知识重新实现一个简单的Re...【详细内容】
2024-01-25  547蓝色星球    Tags:Go   点击:(68)  评论:(0)  加入收藏
Go 内存优化与垃圾收集
Go提供了自动化的内存管理机制,但在某些情况下需要更精细的微调从而避免发生OOM错误。本文将讨论Go的垃圾收集器、应用程序内存优化以及如何防止OOM(Out-Of-Memory)错误。Go...【详细内容】
2024-01-15  DeepNoMind  微信公众号  Tags:Go   点击:(63)  评论:(0)  加入收藏
Go函数指针是如何让你的程序变慢的?
导读Go 语言的常规优化手段无需赘述,相信大家也能找到大量的经典教程。但基于 Go 的函数值问题,业界还没有太多深度讨论的内容分享。本文作者根据自己对 Go 代码的使用与调优...【详细内容】
2024-01-15  腾讯云开发者  微信公众号  Tags:Go函数   点击:(88)  评论:(0)  加入收藏
Go编程中调用外部命令的几种场景
在很多场合, 使用Go语言需要调用外部命令来完成一些特定的任务, 例如: 使用Go语言调用Linux命令来获取执行的结果,又或者调用第三方程序执行来完成额外的任务。在go的标准库...【详细内容】
2024-01-09  suntiger    Tags:Go编程   点击:(107)  评论:(0)  加入收藏
Go 语言不支持并发读写 Map,为什么?
Go语言的map类型不支持并发读写的主要原因是并发读写会导致数据竞态(data race),这意味着多个 goroutine 可能同时访问并修改同一个 map,从而引发不确定的结果。在Go语言的设计...【详细内容】
2024-01-05  Go语言圈  微信公众号  Tags:Go 语言   点击:(81)  评论:(0)  加入收藏
Go微服务入门到容器化实践
Go微服务入门到容器化实践Go 是一门高效、现代化、快速增长的编程语言,非常适合构建 Web 应用程序。而 Docker 是一种轻量级的容器化技术,能够使得您的应用程序在任何地方运行...【详细内容】
2024-01-01  大雷家吃饭    Tags:Go微服务   点击:(63)  评论:(0)  加入收藏
你是否想知道如何应对高并发?Go语言为你提供了答案!
并发编程是当前软件领域中不可忽视的一个关键概念。随着CPU等硬件的不断发展,我们都渴望让我们的程序运行速度更快、更快。而Go语言在语言层面天生支持并发,充分利用现代CPU的...【详细内容】
2023-12-29  灵墨AI探索室  微信公众号  Tags:Go语言   点击:(110)  评论:(0)  加入收藏
站内最新
站内热门
站内头条