在我們前面的一些介紹 sync包相關(guān)的文章中,我們應(yīng)該也發(fā)現(xiàn)了,其中有不少地方使用了原子操作。比如 sync.WaitGroup、sync.Map再到 sync.Pool,這些結(jié)構(gòu)體的實(shí)現(xiàn)中都有原子操作的身影。原子操作在并發(fā)編程中是一種非常重要的操作,它可以保證并發(fā)安全,而且效率也很高。本文將會(huì)深入探討一下 go 中原子操作的原理、使用場(chǎng)景、用法等內(nèi)容。
如果讓我用一句話來(lái)說(shuō)明什么是原子操作,那就是:原子操作是變量級(jí)別的互斥鎖。簡(jiǎn)單來(lái)說(shuō),就是同一時(shí)刻,只能有一個(gè) CPU 對(duì)變量進(jìn)行讀或?qū)?。?dāng)我們想要對(duì)某個(gè)變量做并發(fā)安全的修改,除了使用官方提供的 Mutex,還可以使用 sync/atomic包的原子操作,它能夠保證對(duì)變量的讀取或修改期間不被其他的協(xié)程所影響。
(資料圖片)
我們可以用下圖來(lái)表示:
說(shuō)明:在上圖中,我們有三個(gè) CPU 邏輯核,其中 CPU 1 正在對(duì)變量 v做原子操作,這個(gè)時(shí)候 CPU 2 和 CPU 3 不能對(duì) v做任何操作,在 CPU 1 操作完成后,CPU 2 和 CPU 3 可以獲取到 v的最新值。
從這個(gè)角度看,我們可以把 sync/atomic包中的原子操作看成是變量級(jí)別的互斥鎖。就是說(shuō),在 go 中,當(dāng)一個(gè)協(xié)程對(duì)變量做原子操作時(shí),其他協(xié)程不能對(duì)這個(gè)變量做任何操作,直到這個(gè)協(xié)程操作完成。
拿一個(gè)簡(jiǎn)單的例子來(lái)說(shuō)明一下原子操作的使用場(chǎng)景:
func TestAtomic(t *testing.T) {var sum = 0var wg sync.WaitGroupwg.Add(1000)// 啟動(dòng) 1000 個(gè)協(xié)程,每個(gè)協(xié)程對(duì) sum 做加法操作for i := 0; i < 1000; i++ {go func() {defer wg.Done()sum++}()}// 等待所有的協(xié)程都執(zhí)行完畢wg.Wait()fmt.Println(sum) // 這里輸出多少呢?}我們可以在自己的電腦上運(yùn)行一下這段代碼,看看輸出的結(jié)果是多少。不出意外的話,應(yīng)該每次可能都不一樣,而且應(yīng)該也不是 1000,這是為什么呢?
這是因?yàn)?,CPU 在對(duì) sum做加法的時(shí)候,需要先將 sum目前的值讀取到 CPU 的寄存器中,然后再進(jìn)行加法操作,最后再寫回到內(nèi)存中。如果有兩個(gè) CPU 同時(shí)取了 sum的值,然后都進(jìn)行了加法操作,然后都再寫回到內(nèi)存中,那么就會(huì)導(dǎo)致 sum的值被覆蓋,從而導(dǎo)致結(jié)果不正確。
舉個(gè)例子,目前內(nèi)存中的 sum為 1,然后兩個(gè) CPU 同時(shí)取了這個(gè) 1 來(lái)做加法,然后都得到了結(jié)果 2,然后這兩個(gè) CPU 將各自的計(jì)算結(jié)果寫回到內(nèi)存中,那么內(nèi)存中的 sum就變成了 2,而不是 3。
在這種場(chǎng)景下,我們可以使用原子操作來(lái)實(shí)現(xiàn)并發(fā)安全的加法操作:
func TestAtomic1(t *testing.T) {// 將 sum 的類型改成 int32,因?yàn)樵硬僮髦荒茚槍?duì) int32、int64、uint32、uint64、uintptr 這幾種類型var sum int32 = 0var wg sync.WaitGroupwg.Add(1000) // 啟動(dòng) 1000 個(gè)協(xié)程,每個(gè)協(xié)程對(duì) sum 做加法操作for i := 0; i < 1000; i++ {go func() {defer wg.Done()// 將 sum++ 改成下面這樣atomic.AddInt32(&sum, 1)}()}wg.Wait()fmt.Println(sum) // 輸出 1000}在上面這個(gè)例子中,我們每次執(zhí)行都能得到 1000 這個(gè)結(jié)果。
因?yàn)槭褂迷硬僮鞯臅r(shí)候,同一時(shí)刻只能有一個(gè) CPU 對(duì)變量進(jìn)行讀或?qū)?,所以就不?huì)出現(xiàn)上面的問(wèn)題了。
所以很多需要對(duì)變量做并發(fā)讀寫的地方,我們都可以考慮一下,是否可以使用原子操作來(lái)實(shí)現(xiàn)并發(fā)安全的操作(而不是使用互斥鎖,互斥鎖效率相比原子操作要低一些)。
原子操作的使用場(chǎng)景也是和互斥鎖類似的,但是不一樣的是,我們的鎖粒度只是一個(gè)變量而已。也就是說(shuō),當(dāng)我們不允許多個(gè) CPU 同時(shí)對(duì)變量進(jìn)行讀寫的時(shí)候(保證變量同一時(shí)刻只能一個(gè) CPU 操作),就可以使用原子操作。
看完上面原子操作的介紹,有沒(méi)有覺(jué)得原子操作很神奇,居然有這么好用的東西。那它到底是怎么實(shí)現(xiàn)的呢?
一般情況下,原子操作的實(shí)現(xiàn)需要特殊的 CPU 指令或者系統(tǒng)調(diào)用。這些指令或者系統(tǒng)調(diào)用可以保證在執(zhí)行期間不會(huì)被其他操作或事件中斷,從而保證操作的原子性。
例如,在 x86 架構(gòu)的 CPU 中,可以使用 LOCK前綴來(lái)實(shí)現(xiàn)原子操作。LOCK前綴可以與其他指令一起使用,用于鎖定內(nèi)存總線,防止其他 CPU 訪問(wèn)同一內(nèi)存地址,從而實(shí)現(xiàn)原子操作。在使用 LOCK前綴的指令執(zhí)行期間,CPU 會(huì)將當(dāng)前處理器緩存中的數(shù)據(jù)寫回到內(nèi)存中,并鎖定該內(nèi)存地址,防止其他 CPU 修改該地址的數(shù)據(jù)(所以原子操作總是可以讀取到最新的數(shù)據(jù))。一旦當(dāng)前 CPU 對(duì)該地址的操作完成,CPU 會(huì)釋放該內(nèi)存地址的鎖定,其他 CPU 才能繼續(xù)對(duì)該地址進(jìn)行訪問(wèn)。
我們?cè)賮?lái)捋一下上面的內(nèi)容,看看 LOCK前綴是如何實(shí)現(xiàn)原子操作的:
其他架構(gòu)的 CPU 可能會(huì)略有不同,但是原理是一樣的。
在 go 中,主要有以下幾種原子操作:Add、CompareAndSwap、Load、Store、Swap。
Add為前綴,后綴針對(duì)特定類型的名稱。原子增被操作的類型只能是數(shù)值類型,即 int32、int64、uint32、uint64、uintptr原子增減函數(shù)的第一個(gè)參數(shù)為原值,第二個(gè)參數(shù)是要增減多少。方法:func AddInt32(addr *int32, delta int32) (new int32)func AddInt64(addr *int64, delta int64) (new int64)func AddUint32(addr *uint32, delta uint32) (new uint32)func AddUint64(addr *uint64, delta uint64) (new uint64)func AddUintptr(addr *uintptr, delta uintptr) (new uintptr)
int32和 int64的第二個(gè)參數(shù)可以是負(fù)數(shù),這樣就可以做原子減法了。
也就是我們常見(jiàn)的 CAS,在 CAS操作中,會(huì)需要拿舊的值跟 old比較,如果相等,就將 new賦值給 addr。如果不相等,則不做任何操作。最后返回一個(gè) bool值,表示是否成功 swap。
也就是說(shuō),這個(gè)操作可能是不成功的。這很正常,在并發(fā)環(huán)境下,多個(gè)協(xié)程對(duì)同一個(gè)變量進(jìn)行操作,肯定會(huì)存在競(jìng)爭(zhēng)的情況。在這種情況下,偶爾的失敗是正常的,我們只需要在失敗的時(shí)候,重新嘗試即可。因?yàn)樵硬僮餍枰臅r(shí)間往往是比較短的,因此在失敗的時(shí)候,我們可以通過(guò)自旋的方式來(lái)再次進(jìn)行嘗試。
在這種情況下,如果不自旋,那就需要將這個(gè)協(xié)程掛起,等待其他協(xié)程完成操作,然后再次嘗試。這個(gè)過(guò)程相比自旋可能會(huì)更加耗時(shí)。因?yàn)楹苡锌赡苓@次原子操作不成功,下一次就成功了。如果我們每次都將協(xié)程掛起,那么效率就會(huì)大大降低。
for+ 原子操作的方式,在 go 的 sync包中很多地方都有使用,比如 sync.Map,sync.Pool等。這也是使用原子操作時(shí)一個(gè)非常常見(jiàn)的使用模式。
CompareAndSwap的功能:
CompareAndSwap為前綴,后綴針對(duì)特定類型的名稱。原子比較并交換被操作的類型可以是數(shù)值類型或指針類型,即 int32、int64、uint32、uint64、uintptr、unsafe.Pointer原子比較并交換函數(shù)的第一個(gè)參數(shù)為原值指針,第二個(gè)參數(shù)是要比較的值,第三個(gè)參數(shù)是要交換的值。方法:func CompareAndSwapInt32(addr *int32, old, new int32) (swapped bool)func CompareAndSwapInt64(addr *int64, old, new int64) (swapped bool)func CompareAndSwapUint32(addr *uint32, old, new uint32) (swapped bool)func CompareAndSwapUint64(addr *uint64, old, new uint64) (swapped bool)func CompareAndSwapUintptr(addr *uintptr, old, new uintptr) (swapped bool)func CompareAndSwapPointer(addr *unsafe.Pointer, old, new unsafe.Pointer) (swapped bool)
原子性的讀取操作接受一個(gè)對(duì)應(yīng)類型的指針值,返回該指針指向的值。原子性讀取意味著讀取值的同時(shí),當(dāng)前計(jì)算機(jī)的任何 CPU 都不會(huì)進(jìn)行針對(duì)值的讀寫操作。
如果不使用原子 Load,當(dāng)使用 v := value這種賦值方式為變量 v賦值時(shí),讀取到的 value可能不是最新的,因?yàn)樵谧x取操作時(shí)其他協(xié)程對(duì)它的讀寫操作可能會(huì)同時(shí)發(fā)生。
Load 操作有下面這些:
func LoadInt32(addr *int32) (val int32)func LoadInt64(addr *int64) (val int64)func LoadUint32(addr *uint32) (val uint32)func LoadUint64(addr *uint64) (val uint64)func LoadUintptr(addr *uintptr) (val uintptr)func LoadPointer(addr *unsafe.Pointer) (val unsafe.Pointer)
Store可以將 val值保存到 *addr中,Store操作是原子性的,因此在執(zhí)行 Store操作時(shí),當(dāng)前計(jì)算機(jī)的任何 CPU 都不會(huì)進(jìn)行針對(duì) *addr的讀寫操作。
val值保存到 *addr中。與讀操作對(duì)應(yīng)的寫入操作,sync/atomic提供了與原子值載入 Load函數(shù)相對(duì)應(yīng)的原子值存儲(chǔ) Store函數(shù),原子性存儲(chǔ)函數(shù)均以 Store為前綴。Store操作有下面這些:
func StoreInt32(addr *int32, val int32)func StoreInt64(addr *int64, val int64)func StoreUint32(addr *uint32, val uint32)func StoreUint64(addr *uint64, val uint64)func StoreUintptr(addr *uintpre, val uintptr)func StorePointer(addr *unsafe.Pointer, val unsafe.Pointer)
Swap跟 Store有點(diǎn)類似,但是它會(huì)返回 *addr的舊值。
func SwapInt32(addr *int32, new int32) (old int32)func SwapInt64(addr *int64, new int64) (old int64)func SwapUint32(addr *uint32, new uint32) (old uint32)func SwapUint64(addr *uint64, new uint64) (old uint64)func SwapUintptr(addr *uintptr, new uintptr) (old uintptr)func SwapPointer(addr *unsafe.Pointer, new unsafe.Pointer) (old unsafe.Pointer)
從上一節(jié)中,我們知道了在 go 中原子操作可以操作 int32、int64、uint32、uint64、uintptr、unsafe.Pointer這些類型的值。但是在實(shí)際開(kāi)發(fā)中,我們的類型還有很多,比如 string、struct等等,那這些類型的值如何進(jìn)行原子操作呢?答案是使用 atomic.Value。
atomic.Value是一個(gè)結(jié)構(gòu)體,它的內(nèi)部有一個(gè) any類型的字段,存儲(chǔ)了我們要原子操作的值,也就是一個(gè)任意類型的值。
atomic.Value支持以下操作:
Load:原子性的讀取 Value中的值。Store:原子性的存儲(chǔ)一個(gè)值到 Value中。Swap:原子性的交換 Value中的值,返回舊值。CompareAndSwap:原子性的比較并交換 Value中的值,如果舊值和 old相等,則將 new存入 Value中,返回 true,否則返回 false。atomic.Value的這些操作跟上面講到的那些操作其實(shí)差不多,只不過(guò) atomic.Value可以操作任意類型的值。那 atomic.Value是如何實(shí)現(xiàn)的呢?
atomic.Value是一個(gè)結(jié)構(gòu)體,這個(gè)結(jié)構(gòu)體只有一個(gè)字段:
// Value 提供一致類型值的原子加載和存儲(chǔ)。type Value struct {v any}Load返回由最近的 Store設(shè)置的值。如果還沒(méi)有 Store過(guò)任何值,則返回 nil。
// Load 返回由最近的 Store 設(shè)置的值。func (v *Value) Load() (val any) {// atomic.Value 轉(zhuǎn)換為 efaceWordsvp := (*efaceWords)(unsafe.Pointer(v))// 判斷 atomic.Value 的類型typ := LoadPointer(&vp.typ)// 第一次 Store 還沒(méi)有完成,直接返回 nilif typ == nil || typ == unsafe.Pointer(&firstStoreInProgress) {// firstStoreInProgress 是一個(gè)特殊的變量,存儲(chǔ)到 typ 中用來(lái)表示第一次 Store 還沒(méi)有完成return nil}// 獲取 atomic.Value 的值data := LoadPointer(&vp.data)// 將 val 轉(zhuǎn)換為 efaceWords 類型vlp := (*efaceWords)(unsafe.Pointer(&val))// 分別賦值給 val 的 typ 和 datavlp.typ = typvlp.data = datareturn}在 atomic.Value的源碼中,我們都可以看到 efaceWords的身影,它實(shí)際上代表的是 interface{}/any類型:
// 表示一個(gè) interface{}/any 類型type efaceWords struct {typ unsafe.Pointerdata unsafe.Pointer}看到這里我們會(huì)不會(huì)覺(jué)得很困惑,直接返回 val不就可以了嗎?為什么要將 val轉(zhuǎn)換為 efaceWords類型呢?
這是因?yàn)?go 中的原子操作只能操作 int32、int64、uint32、uint64、uintptr、unsafe.Pointer這些類型的值,不支持 interface{}類型,但是如果了解 interface{}底層結(jié)構(gòu)的話,我們就知道 interface{}底層其實(shí)就是一個(gè)結(jié)構(gòu)體,它有兩個(gè)字段,一個(gè)是 type,一個(gè)是 data,type用來(lái)存儲(chǔ) interface{}的類型,data用來(lái)存儲(chǔ) interface{}的值。而且這兩個(gè)字段都是 unsafe.Pointer類型的,所以其實(shí)我們可以對(duì) interface{}的 type和 data分別進(jìn)行原子操作,這樣最終其實(shí)也可以達(dá)到了原子操作 interface{}的目的了,是不是非常地巧妙呢?
Store將 Value的值設(shè)置為 val。對(duì)給定值的所有存儲(chǔ)調(diào)用必須使用相同具體類型的值。不一致類型的存儲(chǔ)會(huì)發(fā)生恐慌,Store(nil)也會(huì) panic。
// Store 將 Value 的值設(shè)置為 val。func (v *Value) Store(val any) {// 不能存儲(chǔ) nil 值if val == nil {panic("sync/atomic: store of nil value into Value")}// atomic.Value 轉(zhuǎn)換為 efaceWordsvp := (*efaceWords)(unsafe.Pointer(v))// val 轉(zhuǎn)換為 efaceWordsvlp := (*efaceWords)(unsafe.Pointer(&val))// 自旋進(jìn)行原子操作,這個(gè)過(guò)程不會(huì)很久,開(kāi)銷相比互斥鎖小for {// LoadPointer 可以保證獲取到的是最新的typ := LoadPointer(&vp.typ)// 第一次 store 的時(shí)候 typ 還是 nil,說(shuō)明是第一次 storeif typ == nil {// 嘗試開(kāi)始第一次 Store。// 禁用搶占,以便其他 goroutines 可以自旋等待完成。// (如果允許搶占,那么其他 goroutine 自旋等待的時(shí)間可能會(huì)比較長(zhǎng),因?yàn)榭赡軙?huì)需要進(jìn)行協(xié)程調(diào)度。)runtime_procPin()// 搶占失敗,意味著有其他 goroutine 成功 store 了,允許搶占,再次嘗試 Store// 這也是一個(gè)原子操作。if !CompareAndSwapPointer(&vp.typ, nil, unsafe.Pointer(&firstStoreInProgress)) {runtime_procUnpin()continue}// 完成第一次 store// 因?yàn)橛?firstStoreInProgress 標(biāo)識(shí)的保護(hù),所以下面的兩個(gè)原子操作是安全的。StorePointer(&vp.data, vlp.data) // 存儲(chǔ)值(原子操作)StorePointer(&vp.typ, vlp.typ) // 存儲(chǔ)類型(原子操作)runtime_procUnpin() // 允許搶占return}// 另外一個(gè) goroutine 正在進(jìn)行第一次 Store。自旋等待。if typ == unsafe.Pointer(&firstStoreInProgress) {continue}// 第一次 Store 已經(jīng)完成了,下面不是第一次 Store 了。// 需要檢查當(dāng)前 Store 的類型跟第一次 Store 的類型是否一致,不一致就 panic。if typ != vlp.typ {panic("sync/atomic: store of inconsistently typed value into Value")}// 后續(xù)的 Store 只需要 Store 值部分就可以了。// 因?yàn)?atomic.Value 只能保存一種類型的值。StorePointer(&vp.data, vlp.data)return}}在 Store中,有以下幾個(gè)注意的點(diǎn):
firstStoreInProgress來(lái)確保第一次 Store的時(shí)候,只有一個(gè) goroutine可以進(jìn)行 Store操作,其他的 goroutine需要自旋等待。如果沒(méi)有這個(gè)保護(hù),那么存儲(chǔ) typ和 data的時(shí)候就會(huì)出現(xiàn)競(jìng)爭(zhēng)(因?yàn)樾枰獌蓚€(gè)原子操作),導(dǎo)致數(shù)據(jù)不一致。在這里其實(shí)可以將 firstStoreInProgress看作是一個(gè)互斥鎖。在進(jìn)行第一次 Store的時(shí)候,會(huì)將當(dāng)前的 goroutine 和 P綁定,這樣拿到 firstStoreInProgress鎖的協(xié)程就可以盡快地完成第一次 Store操作,這樣一來(lái),其他的協(xié)程也不用等待太久。在第一次 Store的時(shí)候,會(huì)有兩個(gè)原子操作,分別存儲(chǔ)類型和值,但是因?yàn)橛?firstStoreInProgress的保護(hù),所以這兩個(gè)原子操作本質(zhì)上是對(duì) interface{}的一個(gè)原子存儲(chǔ)操作。其他協(xié)程在看到有 firstStoreInProgress標(biāo)識(shí)的時(shí)候,就會(huì)自旋等待,直到第一次 Store完成。在后續(xù)的 Store操作中,只需要存儲(chǔ)值就可以了,因?yàn)?atomic.Value只能保存一種類型的值。Swap將 Value的值設(shè)置為 new并返回舊值。對(duì)給定值的所有交換調(diào)用必須使用相同具體類型的值。同時(shí),不一致類型的交換會(huì)發(fā)生恐慌,Swap(nil)也會(huì) panic。
// Swap 將 Value 的值設(shè)置為 new 并返回舊值。func (v *Value) Swap(new any) (old any) {// 不能存儲(chǔ) nil 值if new == nil {panic("sync/atomic: swap of nil value into Value")}// atomic.Value 轉(zhuǎn)換為 efaceWordsvp := (*efaceWords)(unsafe.Pointer(v))// new 轉(zhuǎn)換為 efaceWordsnp := (*efaceWords)(unsafe.Pointer(&new))// 自旋進(jìn)行原子操作,這個(gè)過(guò)程不會(huì)很久,開(kāi)銷相比互斥鎖小for {// 下面這部分代碼跟 Store 一樣,不細(xì)說(shuō)了。// 這部分代碼是進(jìn)行第一次存儲(chǔ)的代碼。typ := LoadPointer(&vp.typ)if typ == nil {runtime_procPin()if !CompareAndSwapPointer(&vp.typ, nil, unsafe.Pointer(&firstStoreInProgress)) {runtime_procUnpin()continue}StorePointer(&vp.data, np.data)StorePointer(&vp.typ, np.typ)runtime_procUnpin()return nil}if typ == unsafe.Pointer(&firstStoreInProgress) {continue}if typ != np.typ {panic("sync/atomic: swap of inconsistently typed value into Value")}// ---- 下面是 Swap 的特有邏輯 ----// op 是返回值op := (*efaceWords)(unsafe.Pointer(&old))// 返回舊的值op.typ, op.data = np.typ, SwapPointer(&vp.data, np.data)return old}}CompareAndSwap將 Value的值與 old比較,如果相等則設(shè)置為 new并返回 true,否則返回 false。對(duì)給定值的所有比較和交換調(diào)用必須使用相同具體類型的值。同時(shí),不一致類型的比較和交換會(huì)發(fā)生恐慌,CompareAndSwap(nil, nil)也會(huì) panic。
// CompareAndSwap 比較并交換。func (v *Value) CompareAndSwap(old, new any) (swapped bool) {// 注意:old 是可以為 nil 的,new 不能為 nil。// old 是 nil 表示是第一次進(jìn)行 Store 操作。if new == nil {panic("sync/atomic: compare and swap of nil value into Value")}// atomic.Value 轉(zhuǎn)換為 efaceWordsvp := (*efaceWords)(unsafe.Pointer(v))// new 轉(zhuǎn)換為 efaceWordsnp := (*efaceWords)(unsafe.Pointer(&new))// old 轉(zhuǎn)換為 efaceWordsop := (*efaceWords)(unsafe.Pointer(&old))// old 和 new 類型必須一致,且不能為 nilif op.typ != nil && np.typ != op.typ {panic("sync/atomic: compare and swap of inconsistently typed values")}// 自旋進(jìn)行原子操作,這個(gè)過(guò)程不會(huì)很久,開(kāi)銷相比互斥鎖小for {// LoadPointer 可以保證獲取到的 typ 是最新的typ := LoadPointer(&vp.typ)if typ == nil { // atomic.Value 是 nil,還沒(méi) Store 過(guò)// 準(zhǔn)備進(jìn)行第一次 Store,但是傳遞進(jìn)來(lái)的 old 不是 nil,compare 這一步就失敗了。直接返回 falseif old != nil {return false}// 下面這部分代碼跟 Store 一樣,不細(xì)說(shuō)了。 // 這部分代碼是進(jìn)行第一次存儲(chǔ)的代碼。runtime_procPin()if !CompareAndSwapPointer(&vp.typ, nil, unsafe.Pointer(&firstStoreInProgress)) {runtime_procUnpin()continue}StorePointer(&vp.data, np.data)StorePointer(&vp.typ, np.typ)runtime_procUnpin()return true}if typ == unsafe.Pointer(&firstStoreInProgress) {continue}if typ != np.typ {panic("sync/atomic: compare and swap of inconsistently typed value into Value")}// 通過(guò)運(yùn)行時(shí)相等性檢查比較舊版本和當(dāng)前版本。// 這允許對(duì)值類型進(jìn)行比較,這是包函數(shù)所沒(méi)有的。// 下面的 CompareAndSwapPointer 僅確保 vp.data 自 LoadPointer 以來(lái)沒(méi)有更改。data := LoadPointer(&vp.data)var i any(*efaceWords)(unsafe.Pointer(&i)).typ = typ(*efaceWords)(unsafe.Pointer(&i)).data = dataif i != old { // atomic.Value 跟 old 不相等return false}// 只做 val 部分的 cas 操作return CompareAndSwapPointer(&vp.data, data, np.data)}}這里需要特別說(shuō)明的只有最后那個(gè)比較相等的判斷,也就是 data := LoadPointer(&vp.data)以及往后的幾行代碼。在開(kāi)發(fā) atomic.Value第一版的時(shí)候,那個(gè)開(kāi)發(fā)者其實(shí)是將這幾行寫成 CompareAndSwapPointer(&vp.data, old.data, np.data)這種形式的。但是在舊的寫法中,會(huì)存在一個(gè)問(wèn)題,如果我們做 CAS操作的時(shí)候,如果傳遞的參數(shù) old是一個(gè)結(jié)構(gòu)體的值這種類型,那么這個(gè)結(jié)構(gòu)體的值是會(huì)被拷貝一份的,同時(shí)再會(huì)被轉(zhuǎn)換為 interface{}/any類型,這個(gè)過(guò)程中,其實(shí)參數(shù)的 old的 data部分指針指向的內(nèi)存跟 vp.data指向的內(nèi)存是不一樣的。這樣的話,CAS操作就會(huì)失敗,這個(gè)時(shí)候就會(huì)返回 false,但是我們本意是要比較它的值,出現(xiàn)這種結(jié)果顯然不是我們想要的。
將值作為 interface{}參數(shù)使用的時(shí)候,會(huì)存在一個(gè)將值轉(zhuǎn)換為 interface{}的過(guò)程。具體我們可以看看 interface{}的實(shí)現(xiàn)原理。
所以,在上面的實(shí)現(xiàn)中,會(huì)將舊值的 typ和 data賦值給一個(gè) any類型的變量,然后使用 i != old這種方式進(jìn)行判斷,這樣就可以實(shí)現(xiàn)在比較的時(shí)候,比較的是值,而不是由值轉(zhuǎn)換為 interface{}后的指針。
我們現(xiàn)在知道了,atomic.Value可以對(duì)任意類型做原子操作。而對(duì)于其他的原子類型,比如 int32、int64、uint32、uint64、uintptr、unsafe.Pointer等,其實(shí)在 go 中也提供了包裝的類型,讓我們可以以對(duì)象的方式來(lái)操作這些類型。
對(duì)應(yīng)的類型如下:
atomic.Bool:這個(gè)比較特別,但底層實(shí)際上是一個(gè) uint32類型的值。我們對(duì) atomic.Bool做原子操作的時(shí)候,實(shí)際上是對(duì) uint32做原子操作。atomic.Int32:int32類型的包裝類型atomic.Int64:int64類型的包裝類型atomic.Uint32:uint32類型的包裝類型atomic.Uint64:uint64類型的包裝類型atomic.Uintptr:uintptr類型的包裝類型atomic.Pointer:unsafe.Pointer類型的包裝類型這幾種類型的實(shí)現(xiàn)的代碼基本一樣,除了類型不一樣,我們可以看看 atomic.Int32的實(shí)現(xiàn):
// An Int32 is an atomic int32. The zero value is zero.type Int32 struct {_ noCopyv int32}// Load atomically loads and returns the value stored in x.func (x *Int32) Load() int32 { return LoadInt32(&x.v) }// Store atomically stores val into x.func (x *Int32) Store(val int32) { StoreInt32(&x.v, val) }// Swap atomically stores new into x and returns the previous value.func (x *Int32) Swap(new int32) (old int32) { return SwapInt32(&x.v, new) }// CompareAndSwap executes the compare-and-swap operation for x.func (x *Int32) CompareAndSwap(old, new int32) (swapped bool) {return CompareAndSwapInt32(&x.v, old, new)}可以看到,atomic.Int32的實(shí)現(xiàn)都是基于 atomic包中 int32類型相關(guān)的原子操作函數(shù)來(lái)實(shí)現(xiàn)的。
那我們有了互斥鎖,為什么還要有原子操作呢?我們進(jìn)行比較一下就知道了:
| 原子操作 | 互斥鎖 | |
|---|---|---|
| 保護(hù)的范圍 | 變量 | 代碼塊 |
| 保護(hù)的粒度 | 小 | 大 |
| 性能 | 高 | 低 |
| 如何實(shí)現(xiàn)的 | 硬件指令 | 軟件層面實(shí)現(xiàn),邏輯較多 |
如果我們只需要對(duì)某一個(gè)變量做并發(fā)讀寫,那么使用原子操作就可以了,因?yàn)樵硬僮鞯男阅鼙然コ怄i高很多。但是如果我們需要對(duì)多個(gè)變量做并發(fā)讀寫,那么就需要用到互斥鎖了,這種場(chǎng)景往往是在一段代碼中對(duì)不同變量做讀寫。
我們前面這個(gè)表格提到了原子操作與互斥鎖性能上有差異,我們寫幾行代碼來(lái)進(jìn)行比較一下:
// 系統(tǒng)信息 cpu: Intel(R) Core(TM) i7-8700 CPU @ 3.20GHz// 10.13 ns/opfunc BenchmarkMutex(b *testing.B) { var mu sync.Mutex for i := 0; i < b.N; i++ { mu.Lock() mu.Unlock() }}// 5.849 ns/opfunc BenchmarkAtomic(b *testing.B) { var sum atomic.Uint64 for i := 0; i < b.N; i++ { sum.Add(uint64(1)) }}在對(duì) Mutex的性能測(cè)試中,我只是寫了簡(jiǎn)單的 Lock()和 UnLock()操作,因?yàn)檫@種比較才算是對(duì) Mutex本身的測(cè)試,而在 Atomic的性能測(cè)試中,對(duì) sum做原子累加的操作。最終結(jié)果是,使用 Atomic的操作耗時(shí)大概比 Mutex少了 40%以上。
在實(shí)際開(kāi)發(fā)中,Mutex保護(hù)的臨界區(qū)內(nèi)往往有更多操作,也就意味著 Mutex鎖需要耗費(fèi)更長(zhǎng)的時(shí)間才能釋放,也就是會(huì)需要耗費(fèi)比上面這個(gè) 40%還要多的時(shí)間另外一個(gè)協(xié)程才能獲取到 Mutex鎖。
在文章的開(kāi)頭,我們就說(shuō)了,在 go 的 sync.Map和 sync.Pool中都有用到了原子操作,本節(jié)就來(lái)看一看這些操作。
在 sync.Map中使用到了一個(gè) entry結(jié)構(gòu)體,這個(gè)結(jié)構(gòu)體中大部分操作都是原子操作,我們可以看看它下面這兩個(gè)方法的定義:
// 刪除 entryfunc (e *entry) delete() (value any, ok bool) {for {p := e.p.Load()// 已經(jīng)被刪除了,不需要再刪除if p == nil || p == expunged {return nil, false}// 刪除成功if e.p.CompareAndSwap(p, nil) {return *p, true}}}// 如果條目尚未刪除,trySwap 將交換一個(gè)值。func (e *entry) trySwap(i *any) (*any, bool) {for {p := e.p.Load()// 已經(jīng)被刪除了if p == expunged {return nil, false}// swap 成功if e.p.CompareAndSwap(p, i) {return p, true}}}我們可以看到一個(gè)非常典型的特征就是 for+ CompareAndSwap的組合,這個(gè)組合在 entry中出現(xiàn)了很多次。
如果我們也需要對(duì)變量做并發(fā)讀寫,也可以嘗試一下這種 for + CompareAndSwap 的組合。
在 sync.WaitGroup中有一個(gè)類型為 atomic.Uint64的 state字段,這個(gè)變量是用來(lái)記錄 WaitGroup的狀態(tài)的。在實(shí)際使用中,它的高 32 位用來(lái)記錄 WaitGroup的計(jì)數(shù)器,低 32 位用來(lái)記錄 WaitGroup的 Waiter的數(shù)量,也就是等待條件變量滿足的協(xié)程數(shù)量。
如果不使用一個(gè)變量來(lái)記錄這兩個(gè)值,那么我們就需要使用兩個(gè)變量來(lái)記錄,這樣就會(huì)導(dǎo)致我們需要對(duì)兩個(gè)變量做并發(fā)讀寫,在這種情況下,我們就需要使用互斥鎖來(lái)保護(hù)這兩個(gè)變量,這樣就會(huì)導(dǎo)致性能的下降。
而使用一個(gè)變量來(lái)記錄這兩個(gè)值,我們就可以使用原子操作來(lái)保護(hù)這個(gè)變量,這樣就可以保證并發(fā)讀寫的安全性,同時(shí)也能得到更好的性能:
// WaitGroup 的 Add 函數(shù):高 32 位加上 deltastate := wg.state.Add(uint64(delta) << 32)// WaitGroup 的 Wait 函數(shù):低 32 位加 1// 等待者的數(shù)量加 1wg.state.CompareAndSwap(state, state+1)
當(dāng)然這里是指指向同一行 CAS代碼的時(shí)候(也就是有競(jìng)爭(zhēng)的時(shí)候),如果是指向不同行 CAS代碼的時(shí)候,那么就不一定了。比如下面這個(gè)例子,我們把前面計(jì)算 sum的例子改一改,改成用 CAS操作來(lái)完成:
func TestCas(t *testing.T) {var sum int32 = 0var wg sync.WaitGroupwg.Add(1000)for i := 0; i < 1000; i++ {go func() {defer wg.Done()// 這一行是有可能會(huì)失敗的atomic.CompareAndSwapInt32(&sum, sum, sum+1)}()}wg.Wait()fmt.Println(sum) // 不是 1000}在這個(gè)例子中,我們把 atomic.AddInt32(&sum, 1)改成了 atomic.CompareAndSwapInt32(&sum, sum, sum+1),這樣就會(huì)導(dǎo)致有可能會(huì)有多個(gè) goroutine 同時(shí)執(zhí)行到 atomic.CompareAndSwapInt32(&sum, sum, sum+1)這一行代碼,這樣肯定會(huì)有不同的 goroutine 同時(shí)拿到一個(gè)相同的 sum的舊值,那么在這種情況下,就會(huì)導(dǎo)致 CAS操作失敗。也就是說(shuō),將 sum替換為 sum + 1的操作可能會(huì)失敗。
失敗意味著什么呢?意味著另外一個(gè)協(xié)程序先把 sum的值加 1 了,這個(gè)時(shí)候其實(shí)我們不應(yīng)該在舊的 sum上加 1 了,而是應(yīng)該在最新的 sum上加上 1,那我們應(yīng)該怎么做呢?我們可以在 CAS操作失敗的時(shí)候,重新獲取 sum的值,然后再次嘗試 CAS操作,直到成功為止:
func TestCas(t *testing.T) {var sum int32 = 0var wg sync.WaitGroupwg.Add(1000)for i := 0; i < 1000; i++ {go func() {defer wg.Done()// cas 失敗的時(shí)候,重新獲取 sum 的值進(jìn)行計(jì)算。// cas 成功則返回。for {if atomic.CompareAndSwapInt32(&sum, sum, sum+1) {return}}}()}wg.Wait()fmt.Println(sum)}原子操作是并發(fā)編程中非常重要的一個(gè)概念,它可以保證并發(fā)讀寫的安全性,同時(shí)也能得到更好的性能。
最后,總結(jié)一下本文講到的內(nèi)容:
原子操作是更加底層的操作,它保護(hù)的是單個(gè)變量,而互斥鎖可以保護(hù)一個(gè)代碼片段,它們的使用場(chǎng)景是不一樣的。原子操作需要通過(guò) CPU 指令來(lái)實(shí)現(xiàn),而互斥鎖是在軟件層面實(shí)現(xiàn)的。go 里面的原子操作有以下這些:Add:原子增減CompareAndSwap:原子比較并交換Load:原子讀取Store:原子寫入Swap:原子交換go 里面所有類型都能使用原子操作,只是不同類型的原子操作使用的函數(shù)不太一樣。atomic.Value可以用來(lái)原子操作任意類型的變量。go 里面有些底層實(shí)現(xiàn)也使用了原子操作,比如:sync.WaitGroup:使用原子操作來(lái)保證計(jì)數(shù)器和等待者數(shù)量的并發(fā)讀寫安全性。sync.Map:entry結(jié)構(gòu)體中基本所有操作都有原子操作的身影。原子操作有失敗必然有成功(說(shuō)的是同一行 CAS操作),如果 CAS操作失敗了,那么我們可以重新獲取舊值,然后再次嘗試 CAS操作,直到成功為止。總的來(lái)說(shuō),原子操作本身其實(shí)沒(méi)有太復(fù)雜的邏輯,我們理解了它的原理之后,就可以很容易的使用它了。
推薦學(xué)習(xí):Golang教程
以上就是什么是原子操作?深入淺析go中的原子操作的詳細(xì)內(nèi)容,更多請(qǐng)關(guān)注php中文網(wǎng)其它相關(guān)文章!
關(guān)鍵詞: