diff --git a/README.md b/README.md index 5eeedaa..e6037dd 100644 --- a/README.md +++ b/README.md @@ -1001,6 +1001,7 @@ package main import ( "encoding/json" "fmt" + "sync/atomic" "time" "github.com/fufuok/utils" @@ -1014,7 +1015,6 @@ import ( "github.com/fufuok/utils/xhash" "github.com/fufuok/utils/xid" "github.com/fufuok/utils/xjson/jsongen" - "github.com/fufuok/utils/xsync" ) func main() { @@ -1186,16 +1186,16 @@ func main() { now = utils.WaitNextSecondWithTime() fmt.Println("hour:minute:second.00*ms", now) - count := xsync.NewCounter() + count := int64(0) bus := sched.New() // 默认并发数: runtime.NumCPU() for i := 0; i < 30; i++ { bus.Add(1) bus.RunWithArgs(func(n ...interface{}) { - count.Add(int64(n[0].(int))) + atomic.AddInt64(&count, int64(n[0].(int))) }, i) } bus.Wait() - fmt.Println("count:", count.Value()) // count: 435 + fmt.Println("count:", atomic.LoadInt64(&count)) // count: 435 // 继续下一批任务 bus.Add(1) diff --git a/sched/examples/main.go b/sched/examples/main.go index f9a1a67..c3ca453 100644 --- a/sched/examples/main.go +++ b/sched/examples/main.go @@ -2,23 +2,23 @@ package main import ( "fmt" + "sync/atomic" "time" "github.com/fufuok/utils/sched" - "github.com/fufuok/utils/xsync" ) func main() { - count := xsync.NewCounter() + count := int64(0) bus := sched.New() // 默认并发数: runtime.NumCPU() for i := 0; i < 30; i++ { bus.Add(1) bus.RunWithArgs(func(n ...interface{}) { - count.Add(int64(n[0].(int))) + atomic.AddInt64(&count, int64(n[0].(int))) }, i) } bus.Wait() - fmt.Println("count:", count.Value()) // count: 435 + fmt.Println("count:", atomic.LoadInt64(&count)) // count: 435 // 继续下一批任务 bus.Add(1) diff --git a/xsync/BENCHMARKS.md b/xsync/BENCHMARKS.md index af72721..aaa72fa 100644 --- a/xsync/BENCHMARKS.md +++ b/xsync/BENCHMARKS.md @@ -12,6 +12,8 @@ $ benchstat bench.txt | tee benchstat.txt The below sections contain some of the results. Refer to [this gist](https://gist.github.com/puzpuzpuz/e62e38e06feadecfdc823c0f941ece0b) for the complete output. +Please note that `MapOf` got a number of optimizations since v2.3.1, so the current result is likely to be different. + ### Counter vs. atomic int64 ``` diff --git a/xsync/LICENSE b/xsync/LICENSE index 8376971..261eeb9 100644 --- a/xsync/LICENSE +++ b/xsync/LICENSE @@ -1,21 +1,201 @@ -MIT License - -Copyright (c) 2021 Andrey Pechkurov - -Permission is hereby granted, free of charge, to any person obtaining a copy -of this software and associated documentation files (the "Software"), to deal -in the Software without restriction, including without limitation the rights -to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -copies of the Software, and to permit persons to whom the Software is -furnished to do so, subject to the following conditions: - -The above copyright notice and this permission notice shall be included in all -copies or substantial portions of the Software. - -THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE -SOFTWARE. + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "[]" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright [yyyy] [name of copyright owner] + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. diff --git a/xsync/README.md b/xsync/README.md index 7af99bc..70a7343 100644 --- a/xsync/README.md +++ b/xsync/README.md @@ -1,6 +1,6 @@ # 标准库 `sync` 扩展包 -*forked from puzpuzpuz/xsync v20240622 v3.2.0* +*forked from puzpuzpuz/xsync v20240708 v3.3.1* ## 改动: @@ -35,7 +35,7 @@ import ( ) ``` -*Note for v1 and v2 users*: v1 and v2 support is discontinued, so please upgrade to v3. While the API has some breaking changes, the migration should be trivial. +*Note for pre-v3 users*: v1 and v2 support is discontinued, so please upgrade to v3. While the API has some breaking changes, the migration should be trivial. ### Counter @@ -77,7 +77,9 @@ m.Store("foo", "bar") v, ok := m.Load("foo") ``` -One important difference with `Map` is that `MapOf` supports arbitrary `comparable` key types: +Apart from CLHT, `MapOf` borrows ideas from Java's `j.u.c.ConcurrentHashMap` (immutable K/V pair structs instead of atomic snapshots) and C++'s `absl::flat_hash_map` (meta memory and SWAR-based lookups). It also has more dense memory layout when compared with `Map`. Long story short, `MapOf` should be preferred over `Map` when possible. + +An important difference with `Map` is that `MapOf` supports arbitrary `comparable` key types: ```go type Point struct { diff --git a/xsync/export_mapof_test.go b/xsync/export_mapof_test.go index 0ede5bf..22dc293 100644 --- a/xsync/export_mapof_test.go +++ b/xsync/export_mapof_test.go @@ -3,21 +3,15 @@ package xsync +const ( + EntriesPerMapOfBucket = entriesPerMapOfBucket + DefaultMinMapOfTableCap = defaultMinMapTableLen * entriesPerMapOfBucket +) + type ( BucketOfPadded = bucketOfPadded ) -func MakeHasher[T comparable]() func(T, uint64) uint64 { - return makeHasher[T]() -} - -func CollectMapOfStats[K comparable, V any](m *MapOf[K, V]) MapStats { - return MapStats{m.stats()} -} - -func NewMapOfWithHasher[K comparable, V any]( - hasher func(K, uint64) uint64, - options ...func(*MapConfig), -) *MapOf[K, V] { - return newMapOf[K, V](hasher, options...) +func DefaultHasher[T comparable]() func(T, uint64) uint64 { + return defaultHasher[T]() } diff --git a/xsync/export_test.go b/xsync/export_test.go index 23f517d..be360c7 100644 --- a/xsync/export_test.go +++ b/xsync/export_test.go @@ -12,14 +12,6 @@ type ( BucketPadded = bucketPadded ) -type MapStats struct { - mapStats -} - -func CollectMapStats(m *Map) MapStats { - return MapStats{m.stats()} -} - func LockBucket(mu *uint64) { lockBucket(mu) } @@ -52,6 +44,22 @@ func Fastrand() uint32 { return runtime_fastrand() } +func Broadcast(b uint8) uint64 { + return broadcast(b) +} + +func FirstMarkedByteIndex(w uint64) int { + return firstMarkedByteIndex(w) +} + +func MarkZeroBytes(w uint64) uint64 { + return markZeroBytes(w) +} + +func SetByte(w uint64, b uint8, idx int) uint64 { + return setByte(w, b, idx) +} + func NextPowOf2(v uint32) uint32 { return nextPowOf2(v) } diff --git a/xsync/map.go b/xsync/map.go index 92d73ac..8e98c1e 100644 --- a/xsync/map.go +++ b/xsync/map.go @@ -19,7 +19,7 @@ const ( ) const ( - // number of entries per bucket; 3 entries lead to size of 64B + // number of Map entries per bucket; 3 entries lead to size of 64B // (one cache line) on 64-bit machines entriesPerMapBucket = 3 // threshold fraction of table occupation to start a table shrinking @@ -477,7 +477,7 @@ func (m *Map) doCompute( unlockBucket(&rootb.topHashMutex) return newValue, false } - // Create and append the bucket. + // Create and append a bucket. newb := new(bucketPadded) newb.keys[0] = unsafe.Pointer(&key) newb.values[0] = unsafe.Pointer(&newValue) @@ -766,23 +766,51 @@ func (table *mapTable) sumSize() int64 { return sum } -type mapStats struct { - RootBuckets int +// MapStats is Map/MapOf statistics. +// +// Warning: map statistics are intented to be used for diagnostic +// purposes, not for production code. This means that breaking changes +// may be introduced into this struct even between minor releases. +type MapStats struct { + // RootBuckets is the number of root buckets in the hash table. + // Each bucket holds a few entries. + RootBuckets int + // TotalBuckets is the total number of buckets in the hash table, + // including root and their chained buckets. Each bucket holds + // a few entries. TotalBuckets int + // EmptyBuckets is the number of buckets that hold no entries. EmptyBuckets int - Capacity int - Size int // calculated number of entries - Counter int // number of entries according to table counter - CounterLen int // number of counter stripes - MinEntries int // min entries per chain of buckets - MaxEntries int // max entries per chain of buckets + // Capacity is the Map/MapOf capacity, i.e. the total number of + // entries that all buckets can physically hold. This number + // does not consider the load factor. + Capacity int + // Size is the exact number of entries stored in the map. + Size int + // Counter is the number of entries stored in the map according + // to the internal atomic counter. In case of concurrent map + // modifications this number may be different from Size. + Counter int + // CounterLen is the number of internal atomic counter stripes. + // This number may grow with the map capacity to improve + // multithreaded scalability. + CounterLen int + // MinEntries is the minimum number of entries per a chain of + // buckets, i.e. a root bucket and its chained buckets. + MinEntries int + // MinEntries is the maximum number of entries per a chain of + // buckets, i.e. a root bucket and its chained buckets. + MaxEntries int + // TotalGrowths is the number of times the hash table grew. TotalGrowths int64 + // TotalGrowths is the number of times the hash table shrinked. TotalShrinks int64 } -func (s *mapStats) ToString() string { +// ToString returns string representation of map stats. +func (s *MapStats) ToString() string { var sb strings.Builder - sb.WriteString("\n---\n") + sb.WriteString("MapStats{\n") sb.WriteString(fmt.Sprintf("RootBuckets: %d\n", s.RootBuckets)) sb.WriteString(fmt.Sprintf("TotalBuckets: %d\n", s.TotalBuckets)) sb.WriteString(fmt.Sprintf("EmptyBuckets: %d\n", s.EmptyBuckets)) @@ -794,13 +822,15 @@ func (s *mapStats) ToString() string { sb.WriteString(fmt.Sprintf("MaxEntries: %d\n", s.MaxEntries)) sb.WriteString(fmt.Sprintf("TotalGrowths: %d\n", s.TotalGrowths)) sb.WriteString(fmt.Sprintf("TotalShrinks: %d\n", s.TotalShrinks)) - sb.WriteString("---\n") + sb.WriteString("}\n") return sb.String() } -// O(N) operation; use for debug purposes only -func (m *Map) stats() mapStats { - stats := mapStats{ +// Stats returns statistics for the Map. Just like other map +// methods, this one is thread-safe. Yet it's an O(N) operation, +// so it should be used only for diagnostics or debugging purposes. +func (m *Map) Stats() MapStats { + stats := MapStats{ TotalGrowths: atomic.LoadInt64(&m.totalGrowths), TotalShrinks: atomic.LoadInt64(&m.totalShrinks), MinEntries: math.MaxInt32, @@ -829,7 +859,7 @@ func (m *Map) stats() mapStats { if b.next == nil { break } - b = (*bucketPadded)(b.next) + b = (*bucketPadded)(atomic.LoadPointer(&b.next)) stats.TotalBuckets++ } if nentries < stats.MinEntries { diff --git a/xsync/map_test.go b/xsync/map_test.go index a8fdea9..93d5939 100644 --- a/xsync/map_test.go +++ b/xsync/map_test.go @@ -500,7 +500,7 @@ func TestMapStoreThenParallelDelete_DoesNotShrinkBelowMinTableLen(t *testing.T) <-cdone <-cdone - stats := CollectMapStats(m) + stats := m.Stats() if stats.RootBuckets != DefaultMinMapTableLen { t.Fatalf("table length was different from the minimum: %d", stats.RootBuckets) } @@ -571,7 +571,7 @@ func TestMapClear(t *testing.T) { } func assertMapCapacity(t *testing.T, m *Map, expectedCap int) { - stats := CollectMapStats(m) + stats := m.Stats() if stats.Capacity != expectedCap { t.Fatalf("capacity was different from %d: %d", expectedCap, stats.Capacity) } @@ -595,7 +595,7 @@ func TestNewMapPresized_DoesNotShrinkBelowMinTableLen(t *testing.T) { m.Store(strconv.Itoa(i), i) } - stats := CollectMapStats(m) + stats := m.Stats() if stats.RootBuckets <= minTableLen { t.Fatalf("table did not grow: %d", stats.RootBuckets) } @@ -604,7 +604,7 @@ func TestNewMapPresized_DoesNotShrinkBelowMinTableLen(t *testing.T) { m.Delete(strconv.Itoa(int(i))) } - stats = CollectMapStats(m) + stats = m.Stats() if stats.RootBuckets != minTableLen { t.Fatalf("table length was different from the minimum: %d", stats.RootBuckets) } @@ -615,13 +615,13 @@ func TestNewMapGrowOnly_OnlyShrinksOnClear(t *testing.T) { const numEntries = minTableLen * EntriesPerMapBucket m := NewMap(WithPresize(numEntries), WithGrowOnly()) - stats := CollectMapStats(m) + stats := m.Stats() initialTableLen := stats.RootBuckets for i := 0; i < 2*numEntries; i++ { m.Store(strconv.Itoa(i), i) } - stats = CollectMapStats(m) + stats = m.Stats() maxTableLen := stats.RootBuckets if maxTableLen <= minTableLen { t.Fatalf("table did not grow: %d", maxTableLen) @@ -630,13 +630,13 @@ func TestNewMapGrowOnly_OnlyShrinksOnClear(t *testing.T) { for i := 0; i < numEntries; i++ { m.Delete(strconv.Itoa(int(i))) } - stats = CollectMapStats(m) + stats = m.Stats() if stats.RootBuckets != maxTableLen { t.Fatalf("table length was different from the expected: %d", stats.RootBuckets) } m.Clear() - stats = CollectMapStats(m) + stats = m.Stats() if stats.RootBuckets != initialTableLen { t.Fatalf("table length was different from the initial: %d", stats.RootBuckets) } @@ -649,7 +649,7 @@ func TestMapResize(t *testing.T) { for i := 0; i < numEntries; i++ { m.Store(strconv.Itoa(i), i) } - stats := CollectMapStats(m) + stats := m.Stats() if stats.Size != numEntries { t.Fatalf("size was too small: %d", stats.Size) } @@ -673,7 +673,7 @@ func TestMapResize(t *testing.T) { for i := 0; i < numEntries; i++ { m.Delete(strconv.Itoa(i)) } - stats = CollectMapStats(m) + stats = m.Stats() if stats.Size > 0 { t.Fatalf("zero size was expected: %d", stats.Size) } @@ -697,7 +697,7 @@ func TestMapResize_CounterLenLimit(t *testing.T) { for i := 0; i < numEntries; i++ { m.Store("foo"+strconv.Itoa(i), "bar"+strconv.Itoa(i)) } - stats := CollectMapStats(m) + stats := m.Stats() if stats.Size != numEntries { t.Fatalf("size was too small: %d", stats.Size) } @@ -707,7 +707,7 @@ func TestMapResize_CounterLenLimit(t *testing.T) { } } -func parallelSeqResizer(t *testing.T, m *Map, numEntries int, positive bool, cdone chan bool) { +func parallelSeqResizer(m *Map, numEntries int, positive bool, cdone chan bool) { for i := 0; i < numEntries; i++ { if positive { m.Store(strconv.Itoa(i), i) @@ -722,8 +722,8 @@ func TestMapParallelResize_GrowOnly(t *testing.T) { const numEntries = 100_000 m := NewMap() cdone := make(chan bool) - go parallelSeqResizer(t, m, numEntries, true, cdone) - go parallelSeqResizer(t, m, numEntries, false, cdone) + go parallelSeqResizer(m, numEntries, true, cdone) + go parallelSeqResizer(m, numEntries, false, cdone) // Wait for the goroutines to finish. <-cdone <-cdone @@ -942,7 +942,7 @@ func TestMapParallelStoresAndDeletes(t *testing.T) { } } -func parallelComputer(t *testing.T, m *Map, numIters, numEntries int, cdone chan bool) { +func parallelComputer(m *Map, numIters, numEntries int, cdone chan bool) { for i := 0; i < numIters; i++ { for j := 0; j < numEntries; j++ { m.Compute(strconv.Itoa(j), func(oldValue interface{}, loaded bool) (newValue interface{}, delete bool) { @@ -962,7 +962,7 @@ func TestMapParallelComputes(t *testing.T) { m := NewMap() cdone := make(chan bool) for i := 0; i < numWorkers; i++ { - go parallelComputer(t, m, numIters, numWorkers, cdone) + go parallelComputer(m, numIters, numWorkers, cdone) } // Wait for the goroutines to finish. for i := 0; i < numWorkers; i++ { @@ -980,7 +980,7 @@ func TestMapParallelComputes(t *testing.T) { } } -func parallelRangeStorer(t *testing.T, m *Map, numEntries int, stopFlag *int64, cdone chan bool) { +func parallelRangeStorer(m *Map, numEntries int, stopFlag *int64, cdone chan bool) { for { for i := 0; i < numEntries; i++ { m.Store(strconv.Itoa(i), i) @@ -992,7 +992,7 @@ func parallelRangeStorer(t *testing.T, m *Map, numEntries int, stopFlag *int64, cdone <- true } -func parallelRangeDeleter(t *testing.T, m *Map, numEntries int, stopFlag *int64, cdone chan bool) { +func parallelRangeDeleter(m *Map, numEntries int, stopFlag *int64, cdone chan bool) { for { for i := 0; i < numEntries; i++ { m.Delete(strconv.Itoa(i)) @@ -1013,8 +1013,8 @@ func TestMapParallelRange(t *testing.T) { // Start goroutines that would be storing and deleting items in parallel. cdone := make(chan bool) stopFlag := int64(0) - go parallelRangeStorer(t, m, numEntries, &stopFlag, cdone) - go parallelRangeDeleter(t, m, numEntries, &stopFlag, cdone) + go parallelRangeStorer(m, numEntries, &stopFlag, cdone) + go parallelRangeDeleter(m, numEntries, &stopFlag, cdone) // Iterate the map and verify that no duplicate keys were met. met := make(map[string]int) m.Range(func(key string, value interface{}) bool { @@ -1216,6 +1216,60 @@ func testMapTopHashMutex_StoreAfterErase(t *testing.T, topHashes *uint64) { } } +func TestMapStats(t *testing.T) { + m := NewMap() + + stats := m.Stats() + if stats.RootBuckets != DefaultMinMapTableLen { + t.Fatalf("unexpected number of root buckets: %d", stats.RootBuckets) + } + if stats.TotalBuckets != stats.RootBuckets { + t.Fatalf("unexpected number of total buckets: %d", stats.TotalBuckets) + } + if stats.EmptyBuckets != stats.RootBuckets { + t.Fatalf("unexpected number of empty buckets: %d", stats.EmptyBuckets) + } + if stats.Capacity != EntriesPerMapBucket*DefaultMinMapTableLen { + t.Fatalf("unexpected capacity: %d", stats.Capacity) + } + if stats.Size != 0 { + t.Fatalf("unexpected size: %d", stats.Size) + } + if stats.Counter != 0 { + t.Fatalf("unexpected counter: %d", stats.Counter) + } + if stats.CounterLen != 8 { + t.Fatalf("unexpected counter length: %d", stats.CounterLen) + } + + for i := 0; i < 100; i++ { + m.Store(strconv.Itoa(int(i)), i) + } + + stats = m.Stats() + if stats.RootBuckets != 2*DefaultMinMapTableLen { + t.Fatalf("unexpected number of root buckets: %d", stats.RootBuckets) + } + if stats.TotalBuckets < stats.RootBuckets { + t.Fatalf("unexpected number of total buckets: %d", stats.TotalBuckets) + } + if stats.EmptyBuckets >= stats.RootBuckets { + t.Fatalf("unexpected number of empty buckets: %d", stats.EmptyBuckets) + } + if stats.Capacity < 2*EntriesPerMapBucket*DefaultMinMapTableLen { + t.Fatalf("unexpected capacity: %d", stats.Capacity) + } + if stats.Size != 100 { + t.Fatalf("unexpected size: %d", stats.Size) + } + if stats.Counter != 100 { + t.Fatalf("unexpected counter: %d", stats.Counter) + } + if stats.CounterLen != 8 { + t.Fatalf("unexpected counter length: %d", stats.CounterLen) + } +} + func BenchmarkMap_NoWarmUp(b *testing.B) { for _, bc := range benchmarkCases { if bc.readPercentage == 100 { diff --git a/xsync/mapof.go b/xsync/mapof.go index 222370c..7ac8686 100644 --- a/xsync/mapof.go +++ b/xsync/mapof.go @@ -11,6 +11,16 @@ import ( "unsafe" ) +const ( + // number of MapOf entries per bucket; 5 entries lead to size of 64B + // (one cache line) on 64-bit machines + entriesPerMapOfBucket = 5 + defaultMeta uint64 = 0x8080808080808080 + metaMask uint64 = 0xffffffffff + defaultMetaMasked uint64 = defaultMeta & metaMask + emptyMetaSlot uint8 = 0x80 +) + // MapOf is like a Go map[K]V but is safe for concurrent // use by multiple goroutines without additional locking or // coordination. It follows the interface of sync.Map with @@ -27,6 +37,11 @@ import ( // Also, Get operations involve no write to memory, as well as no // mutexes or any other sort of locks. Due to this design, in all // considered scenarios MapOf outperforms sync.Map. +// +// MapOf also borrows ideas from Java's j.u.c.ConcurrentHashMap +// (immutable K/V pair structs instead of atomic snapshots) +// and C++'s absl::flat_hash_map (meta memory and SWAR-based +// lookups). type MapOf[K comparable, V any] struct { totalGrowths int64 totalShrinks int64 @@ -49,7 +64,7 @@ type mapOfTable[K comparable, V any] struct { } // bucketOfPadded is a CL-sized map bucket holding up to -// entriesPerMapBucket entries. +// entriesPerMapOfBucket entries. type bucketOfPadded struct { //lint:ignore U1000 ensure each bucket takes two cache lines on both 32 and 64-bit archs pad [cacheLineSize - unsafe.Sizeof(bucketOf{})]byte @@ -57,9 +72,9 @@ type bucketOfPadded struct { } type bucketOf struct { - hashes [entriesPerMapBucket]uint64 - entries [entriesPerMapBucket]unsafe.Pointer // *entryOf - next unsafe.Pointer // *bucketOfPadded + meta uint64 + entries [entriesPerMapOfBucket]unsafe.Pointer // *entryOf + next unsafe.Pointer // *bucketOfPadded mu sync.Mutex } @@ -72,26 +87,19 @@ type entryOf[K comparable, V any] struct { // NewMapOf creates a new MapOf instance configured with the given // options. func NewMapOf[K comparable, V any](options ...func(*MapConfig)) *MapOf[K, V] { - return newMapOf[K, V](makeHasher[K](), options...) + return NewMapOfWithHasher[K, V](defaultHasher[K](), options...) } -// NewMapOfPresized creates a new MapOf instance with capacity enough -// to hold sizeHint entries. The capacity is treated as the minimal capacity -// meaning that the underlying hash table will never shrink to -// a smaller capacity. If sizeHint is zero or negative, the value -// is ignored. -// -// Deprecated: use NewMapOf in combination with WithPresize. -func NewMapOfPresized[K comparable, V any](sizeHint int) *MapOf[K, V] { - return NewMapOf[K, V](WithPresize(sizeHint)) -} - -func newMapOf[K comparable, V any]( +// NewMapOfWithHasher creates a new MapOf instance configured with the given +// hasher and options. The hash function is used instead of +// the built-in hash function configured when a map is created +// with the NewMapOf function. +func NewMapOfWithHasher[K comparable, V any]( hasher func(K, uint64) uint64, options ...func(*MapConfig), ) *MapOf[K, V] { c := &MapConfig{ - sizeHint: defaultMinMapTableLen * entriesPerMapBucket, + sizeHint: defaultMinMapTableLen * entriesPerMapOfBucket, } for _, o := range options { o(c) @@ -101,10 +109,10 @@ func newMapOf[K comparable, V any]( m.resizeCond = *sync.NewCond(&m.resizeMu) m.hasher = hasher var table *mapOfTable[K, V] - if c.sizeHint <= defaultMinMapTableLen*entriesPerMapBucket { + if c.sizeHint <= defaultMinMapTableLen*entriesPerMapOfBucket { table = newMapOfTable[K, V](defaultMinMapTableLen) } else { - tableLen := nextPowOf2(uint32(c.sizeHint / entriesPerMapBucket)) + tableLen := nextPowOf2(uint32(c.sizeHint / entriesPerMapOfBucket)) table = newMapOfTable[K, V](int(tableLen)) } m.minTableLen = len(table.buckets) @@ -113,8 +121,22 @@ func newMapOf[K comparable, V any]( return m } +// NewMapOfPresized creates a new MapOf instance with capacity enough +// to hold sizeHint entries. The capacity is treated as the minimal capacity +// meaning that the underlying hash table will never shrink to +// a smaller capacity. If sizeHint is zero or negative, the value +// is ignored. +// +// Deprecated: use NewMapOf in combination with WithPresize. +func NewMapOfPresized[K comparable, V any](sizeHint int) *MapOf[K, V] { + return NewMapOf[K, V](WithPresize(sizeHint)) +} + func newMapOfTable[K comparable, V any](minTableLen int) *mapOfTable[K, V] { buckets := make([]bucketOfPadded, minTableLen) + for i := range buckets { + buckets[i].meta = defaultMeta + } counterLen := minTableLen >> 10 if counterLen < minMapCounterLen { counterLen = minMapCounterLen @@ -135,25 +157,24 @@ func newMapOfTable[K comparable, V any](minTableLen int) *mapOfTable[K, V] { // The ok result indicates whether value was found in the map. func (m *MapOf[K, V]) Load(key K) (value V, ok bool) { table := (*mapOfTable[K, V])(atomic.LoadPointer(&m.table)) - hash := shiftHash(m.hasher(key, table.seed)) - bidx := uint64(len(table.buckets)-1) & hash + hash := m.hasher(key, table.seed) + h1 := h1(hash) + h2w := broadcast(h2(hash)) + bidx := uint64(len(table.buckets)-1) & h1 b := &table.buckets[bidx] for { - for i := 0; i < entriesPerMapBucket; i++ { - // We treat the hash code only as a hint, so there is no - // need to get an atomic snapshot. - h := atomic.LoadUint64(&b.hashes[i]) - if h == uint64(0) || h != hash { - continue - } - eptr := atomic.LoadPointer(&b.entries[i]) - if eptr == nil { - continue - } - e := (*entryOf[K, V])(eptr) - if e.key == key { - return e.value, true + metaw := atomic.LoadUint64(&b.meta) + markedw := markZeroBytes(metaw^h2w) & metaMask + for markedw != 0 { + idx := firstMarkedByteIndex(markedw) + eptr := atomic.LoadPointer(&b.entries[idx]) + if eptr != nil { + e := (*entryOf[K, V])(eptr) + if e.key == key { + return e.value, true + } } + markedw &= markedw - 1 } bptr := atomic.LoadPointer(&b.next) if bptr == nil { @@ -285,14 +306,16 @@ func (m *MapOf[K, V]) doCompute( for { compute_attempt: var ( - emptyb *bucketOfPadded - emptyidx int - hintNonEmpty int + emptyb *bucketOfPadded + emptyidx int ) table := (*mapOfTable[K, V])(atomic.LoadPointer(&m.table)) tableLen := len(table.buckets) - hash := shiftHash(m.hasher(key, table.seed)) - bidx := uint64(len(table.buckets)-1) & hash + hash := m.hasher(key, table.seed) + h1 := h1(hash) + h2 := h2(hash) + h2w := broadcast(h2) + bidx := uint64(len(table.buckets)-1) & h1 rootb := &table.buckets[bidx] rootb.mu.Lock() // The following two checks must go in reverse to what's @@ -310,62 +333,62 @@ func (m *MapOf[K, V]) doCompute( } b := rootb for { - for i := 0; i < entriesPerMapBucket; i++ { - h := atomic.LoadUint64(&b.hashes[i]) - if h == uint64(0) { - if emptyb == nil { - emptyb = b - emptyidx = i - } - continue - } - if h != hash { - hintNonEmpty++ - continue - } - e := (*entryOf[K, V])(b.entries[i]) - if e.key == key { - if loadIfExists { - rootb.mu.Unlock() - return e.value, !computeOnly - } - // In-place update/delete. - // We get a copy of the value via an interface{} on each call, - // thus the live value pointers are unique. Otherwise atomic - // snapshot won't be correct in case of multiple Store calls - // using the same value. - oldv := e.value - newv, del := valueFn(oldv, true) - if del { - // Deletion. - // First we update the hash, then the entry. - atomic.StoreUint64(&b.hashes[i], uint64(0)) - atomic.StorePointer(&b.entries[i], nil) - leftEmpty := false - if hintNonEmpty == 0 { - leftEmpty = isEmptyBucketOf(b) + metaw := b.meta + markedw := markZeroBytes(metaw^h2w) & metaMask + for markedw != 0 { + idx := firstMarkedByteIndex(markedw) + eptr := b.entries[idx] + if eptr != nil { + e := (*entryOf[K, V])(eptr) + if e.key == key { + if loadIfExists { + rootb.mu.Unlock() + return e.value, !computeOnly } + // In-place update/delete. + // We get a copy of the value via an interface{} on each call, + // thus the live value pointers are unique. Otherwise atomic + // snapshot won't be correct in case of multiple Store calls + // using the same value. + oldv := e.value + newv, del := valueFn(oldv, true) + if del { + // Deletion. + // First we update the hash, then the entry. + newmetaw := setByte(metaw, emptyMetaSlot, idx) + atomic.StoreUint64(&b.meta, newmetaw) + atomic.StorePointer(&b.entries[idx], nil) + rootb.mu.Unlock() + table.addSize(bidx, -1) + // Might need to shrink the table if we left bucket empty. + if newmetaw == defaultMeta { + m.resize(table, mapShrinkHint) + } + return oldv, !computeOnly + } + newe := new(entryOf[K, V]) + newe.key = key + newe.value = newv + atomic.StorePointer(&b.entries[idx], unsafe.Pointer(newe)) rootb.mu.Unlock() - table.addSize(bidx, -1) - // Might need to shrink the table. - if leftEmpty { - m.resize(table, mapShrinkHint) + if computeOnly { + // Compute expects the new value to be returned. + return newv, true } - return oldv, !computeOnly + // LoadAndStore expects the old value to be returned. + return oldv, true } - newe := new(entryOf[K, V]) - newe.key = key - newe.value = newv - atomic.StorePointer(&b.entries[i], unsafe.Pointer(newe)) - rootb.mu.Unlock() - if computeOnly { - // Compute expects the new value to be returned. - return newv, true - } - // LoadAndStore expects the old value to be returned. - return oldv, true } - hintNonEmpty++ + markedw &= markedw - 1 + } + if emptyb == nil { + // Search for empty entries (up to 5 per bucket). + emptyw := metaw & defaultMetaMasked + if emptyw != 0 { + idx := firstMarkedByteIndex(emptyw) + emptyb = b + emptyidx = idx + } } if b.next == nil { if emptyb != nil { @@ -379,14 +402,14 @@ func (m *MapOf[K, V]) doCompute( newe := new(entryOf[K, V]) newe.key = key newe.value = newValue - // First we update the hash, then the entry. - atomic.StoreUint64(&emptyb.hashes[emptyidx], hash) + // First we update meta, then the entry. + atomic.StoreUint64(&emptyb.meta, setByte(emptyb.meta, h2, emptyidx)) atomic.StorePointer(&emptyb.entries[emptyidx], unsafe.Pointer(newe)) rootb.mu.Unlock() table.addSize(bidx, 1) return newValue, computeOnly } - growThreshold := float64(tableLen) * entriesPerMapBucket * mapLoadFactor + growThreshold := float64(tableLen) * entriesPerMapOfBucket * mapLoadFactor if table.sumSize() > int64(growThreshold) { // Need to grow the table. Then go for another attempt. rootb.mu.Unlock() @@ -400,9 +423,9 @@ func (m *MapOf[K, V]) doCompute( rootb.mu.Unlock() return newValue, false } - // Create and append the bucket. + // Create and append a bucket. newb := new(bucketOfPadded) - newb.hashes[0] = hash + newb.meta = setByte(defaultMeta, h2, 0) newe := new(entryOf[K, V]) newe.key = key newe.value = newValue @@ -440,7 +463,7 @@ func (m *MapOf[K, V]) resize(knownTable *mapOfTable[K, V], hint mapResizeHint) { if hint == mapShrinkHint { if m.growOnly || m.minTableLen == knownTableLen || - knownTable.sumSize() > int64((knownTableLen*entriesPerMapBucket)/mapShrinkFraction) { + knownTable.sumSize() > int64((knownTableLen*entriesPerMapOfBucket)/mapShrinkFraction) { return } } @@ -459,7 +482,7 @@ func (m *MapOf[K, V]) resize(knownTable *mapOfTable[K, V], hint mapResizeHint) { atomic.AddInt64(&m.totalGrowths, 1) newTable = newMapOfTable[K, V](tableLen << 1) case mapShrinkHint: - shrinkThreshold := int64((tableLen * entriesPerMapBucket) / mapShrinkFraction) + shrinkThreshold := int64((tableLen * entriesPerMapOfBucket) / mapShrinkFraction) if tableLen > m.minTableLen && table.sumSize() <= shrinkThreshold { // Shrink the table with factor of 2. atomic.AddInt64(&m.totalShrinks, 1) @@ -500,13 +523,13 @@ func copyBucketOf[K comparable, V any]( rootb := b rootb.mu.Lock() for { - for i := 0; i < entriesPerMapBucket; i++ { + for i := 0; i < entriesPerMapOfBucket; i++ { if b.entries[i] != nil { e := (*entryOf[K, V])(b.entries[i]) - hash := shiftHash(hasher(e.key, destTable.seed)) - bidx := uint64(len(destTable.buckets)-1) & hash + hash := hasher(e.key, destTable.seed) + bidx := uint64(len(destTable.buckets)-1) & h1(hash) destb := &destTable.buckets[bidx] - appendToBucketOf(hash, b.entries[i], destb) + appendToBucketOf(h2(hash), b.entries[i], destb) copied++ } } @@ -534,7 +557,7 @@ func copyBucketOf[K comparable, V any]( func (m *MapOf[K, V]) Range(f func(key K, value V) bool) { var zeroPtr unsafe.Pointer // Pre-allocate array big enough to fit entries for most hash tables. - bentries := make([]unsafe.Pointer, 0, 16*entriesPerMapBucket) + bentries := make([]unsafe.Pointer, 0, 16*entriesPerMapOfBucket) tablep := atomic.LoadPointer(&m.table) table := *(*mapOfTable[K, V])(tablep) for i := range table.buckets { @@ -544,7 +567,7 @@ func (m *MapOf[K, V]) Range(f func(key K, value V) bool) { // the intermediate slice. rootb.mu.Lock() for { - for i := 0; i < entriesPerMapBucket; i++ { + for i := 0; i < entriesPerMapOfBucket; i++ { if b.entries[i] != nil { bentries = append(bentries, b.entries[i]) } @@ -581,18 +604,18 @@ func (m *MapOf[K, V]) Size() int { return int(table.sumSize()) } -func appendToBucketOf(hash uint64, entryPtr unsafe.Pointer, b *bucketOfPadded) { +func appendToBucketOf(h2 uint8, entryPtr unsafe.Pointer, b *bucketOfPadded) { for { - for i := 0; i < entriesPerMapBucket; i++ { + for i := 0; i < entriesPerMapOfBucket; i++ { if b.entries[i] == nil { - b.hashes[i] = hash + b.meta = setByte(b.meta, h2, i) b.entries[i] = entryPtr return } } if b.next == nil { newb := new(bucketOfPadded) - newb.hashes[0] = hash + newb.meta = setByte(defaultMeta, h2, 0) newb.entries[0] = entryPtr b.next = unsafe.Pointer(newb) return @@ -601,21 +624,6 @@ func appendToBucketOf(hash uint64, entryPtr unsafe.Pointer, b *bucketOfPadded) { } } -func isEmptyBucketOf(rootb *bucketOfPadded) bool { - b := rootb - for { - for i := 0; i < entriesPerMapBucket; i++ { - if b.entries[i] != nil { - return false - } - } - if b.next == nil { - return true - } - b = (*bucketOfPadded)(b.next) - } -} - func (table *mapOfTable[K, V]) addSize(bucketIdx uint64, delta int) { cidx := uint64(len(table.size)-1) & bucketIdx atomic.AddInt64(&table.size[cidx].c, int64(delta)) @@ -634,17 +642,19 @@ func (table *mapOfTable[K, V]) sumSize() int64 { return sum } -func shiftHash(h uint64) uint64 { - // uint64(0) is a reserved value which stands for an empty slot. - if h == uint64(0) { - return uint64(1) - } - return h +func h1(h uint64) uint64 { + return h >> 7 } -// O(N) operation; use for debug purposes only -func (m *MapOf[K, V]) stats() mapStats { - stats := mapStats{ +func h2(h uint64) uint8 { + return uint8(h & 0x7f) +} + +// Stats returns statistics for the MapOf. Just like other map +// methods, this one is thread-safe. Yet it's an O(N) operation, +// so it should be used only for diagnostics or debugging purposes. +func (m *MapOf[K, V]) Stats() MapStats { + stats := MapStats{ TotalGrowths: atomic.LoadInt64(&m.totalGrowths), TotalShrinks: atomic.LoadInt64(&m.totalShrinks), MinEntries: math.MaxInt32, @@ -659,8 +669,8 @@ func (m *MapOf[K, V]) stats() mapStats { stats.TotalBuckets++ for { nentriesLocal := 0 - stats.Capacity += entriesPerMapBucket - for i := 0; i < entriesPerMapBucket; i++ { + stats.Capacity += entriesPerMapOfBucket + for i := 0; i < entriesPerMapOfBucket; i++ { if atomic.LoadPointer(&b.entries[i]) != nil { stats.Size++ nentriesLocal++ @@ -673,7 +683,7 @@ func (m *MapOf[K, V]) stats() mapStats { if b.next == nil { break } - b = (*bucketOfPadded)(b.next) + b = (*bucketOfPadded)(atomic.LoadPointer(&b.next)) stats.TotalBuckets++ } if nentries < stats.MinEntries { diff --git a/xsync/mapof_test.go b/xsync/mapof_test.go index 617807d..074019c 100644 --- a/xsync/mapof_test.go +++ b/xsync/mapof_test.go @@ -270,7 +270,31 @@ func TestMapOfStore_StructKeys_StructValues(t *testing.T) { } } -func TestMapOfStore_HashCodeCollisions(t *testing.T) { +func TestMapOfWithHasher(t *testing.T) { + const numEntries = 10000 + m := NewMapOfWithHasher[int, int](murmur3Finalizer) + for i := 0; i < numEntries; i++ { + m.Store(i, i) + } + for i := 0; i < numEntries; i++ { + v, ok := m.Load(i) + if !ok { + t.Fatalf("value not found for %d", i) + } + if v != i { + t.Fatalf("values do not match for %d: %v", i, v) + } + } +} + +func murmur3Finalizer(i int, _ uint64) uint64 { + h := uint64(i) + h = (h ^ (h >> 33)) * 0xff51afd7ed558ccd + h = (h ^ (h >> 33)) * 0xc4ceb9fe1a85ec53 + return h ^ (h >> 33) +} + +func TestMapOfWithHasher_HashCodeCollisions(t *testing.T) { const numEntries = 1000 m := NewMapOfWithHasher[int, int](func(i int, _ uint64) uint64 { // We intentionally use an awful hash function here to make sure @@ -538,7 +562,7 @@ func TestMapOfStoreThenParallelDelete_DoesNotShrinkBelowMinTableLen(t *testing.T <-cdone <-cdone - stats := CollectMapOfStats(m) + stats := m.Stats() if stats.RootBuckets != DefaultMinMapTableLen { t.Fatalf("table length was different from the minimum: %d", stats.RootBuckets) } @@ -609,35 +633,35 @@ func TestMapOfClear(t *testing.T) { } func assertMapOfCapacity[K comparable, V any](t *testing.T, m *MapOf[K, V], expectedCap int) { - stats := CollectMapOfStats(m) + stats := m.Stats() if stats.Capacity != expectedCap { t.Fatalf("capacity was different from %d: %d", expectedCap, stats.Capacity) } } func TestNewMapOfPresized(t *testing.T) { - assertMapOfCapacity(t, NewMapOf[string, string](), DefaultMinMapTableCap) - assertMapOfCapacity(t, NewMapOfPresized[string, string](0), DefaultMinMapTableCap) - assertMapOfCapacity(t, NewMapOf[string, string](WithPresize(0)), DefaultMinMapTableCap) - assertMapOfCapacity(t, NewMapOfPresized[string, string](-100), DefaultMinMapTableCap) - assertMapOfCapacity(t, NewMapOf[string, string](WithPresize(-100)), DefaultMinMapTableCap) - assertMapOfCapacity(t, NewMapOfPresized[string, string](500), 768) - assertMapOfCapacity(t, NewMapOf[string, string](WithPresize(500)), 768) - assertMapOfCapacity(t, NewMapOfPresized[int, int](1_000_000), 1_572_864) - assertMapOfCapacity(t, NewMapOf[int, int](WithPresize(1_000_000)), 1_572_864) - assertMapOfCapacity(t, NewMapOfPresized[point, point](100), 192) - assertMapOfCapacity(t, NewMapOf[point, point](WithPresize(100)), 192) + assertMapOfCapacity(t, NewMapOf[string, string](), DefaultMinMapOfTableCap) + assertMapOfCapacity(t, NewMapOfPresized[string, string](0), DefaultMinMapOfTableCap) + assertMapOfCapacity(t, NewMapOf[string, string](WithPresize(0)), DefaultMinMapOfTableCap) + assertMapOfCapacity(t, NewMapOfPresized[string, string](-100), DefaultMinMapOfTableCap) + assertMapOfCapacity(t, NewMapOf[string, string](WithPresize(-100)), DefaultMinMapOfTableCap) + assertMapOfCapacity(t, NewMapOfPresized[string, string](500), 640) + assertMapOfCapacity(t, NewMapOf[string, string](WithPresize(500)), 640) + assertMapOfCapacity(t, NewMapOfPresized[int, int](1_000_000), 1_310_720) + assertMapOfCapacity(t, NewMapOf[int, int](WithPresize(1_000_000)), 1_310_720) + assertMapOfCapacity(t, NewMapOfPresized[point, point](100), 160) + assertMapOfCapacity(t, NewMapOf[point, point](WithPresize(100)), 160) } func TestNewMapOfPresized_DoesNotShrinkBelowMinTableLen(t *testing.T) { const minTableLen = 1024 - const numEntries = minTableLen * EntriesPerMapBucket + const numEntries = minTableLen * EntriesPerMapOfBucket m := NewMapOf[int, int](WithPresize(numEntries)) for i := 0; i < numEntries; i++ { m.Store(i, i) } - stats := CollectMapOfStats(m) + stats := m.Stats() if stats.RootBuckets <= minTableLen { t.Fatalf("table did not grow: %d", stats.RootBuckets) } @@ -646,7 +670,7 @@ func TestNewMapOfPresized_DoesNotShrinkBelowMinTableLen(t *testing.T) { m.Delete(i) } - stats = CollectMapOfStats(m) + stats = m.Stats() if stats.RootBuckets != minTableLen { t.Fatalf("table length was different from the minimum: %d", stats.RootBuckets) } @@ -654,16 +678,16 @@ func TestNewMapOfPresized_DoesNotShrinkBelowMinTableLen(t *testing.T) { func TestNewMapOfGrowOnly_OnlyShrinksOnClear(t *testing.T) { const minTableLen = 128 - const numEntries = minTableLen * EntriesPerMapBucket + const numEntries = minTableLen * EntriesPerMapOfBucket m := NewMapOf[int, int](WithPresize(numEntries), WithGrowOnly()) - stats := CollectMapOfStats(m) + stats := m.Stats() initialTableLen := stats.RootBuckets for i := 0; i < 2*numEntries; i++ { m.Store(i, i) } - stats = CollectMapOfStats(m) + stats = m.Stats() maxTableLen := stats.RootBuckets if maxTableLen <= minTableLen { t.Fatalf("table did not grow: %d", maxTableLen) @@ -672,13 +696,13 @@ func TestNewMapOfGrowOnly_OnlyShrinksOnClear(t *testing.T) { for i := 0; i < numEntries; i++ { m.Delete(i) } - stats = CollectMapOfStats(m) + stats = m.Stats() if stats.RootBuckets != maxTableLen { t.Fatalf("table length was different from the expected: %d", stats.RootBuckets) } m.Clear() - stats = CollectMapOfStats(m) + stats = m.Stats() if stats.RootBuckets != initialTableLen { t.Fatalf("table length was different from the initial: %d", stats.RootBuckets) } @@ -691,11 +715,11 @@ func TestMapOfResize(t *testing.T) { for i := 0; i < numEntries; i++ { m.Store(strconv.Itoa(i), i) } - stats := CollectMapOfStats(m) + stats := m.Stats() if stats.Size != numEntries { t.Fatalf("size was too small: %d", stats.Size) } - expectedCapacity := int(math.RoundToEven(MapLoadFactor+1)) * stats.RootBuckets * EntriesPerMapBucket + expectedCapacity := int(math.RoundToEven(MapLoadFactor+1)) * stats.RootBuckets * EntriesPerMapOfBucket if stats.Capacity > expectedCapacity { t.Fatalf("capacity was too large: %d, expected: %d", stats.Capacity, expectedCapacity) } @@ -715,11 +739,11 @@ func TestMapOfResize(t *testing.T) { for i := 0; i < numEntries; i++ { m.Delete(strconv.Itoa(i)) } - stats = CollectMapOfStats(m) + stats = m.Stats() if stats.Size > 0 { t.Fatalf("zero size was expected: %d", stats.Size) } - expectedCapacity = stats.RootBuckets * EntriesPerMapBucket + expectedCapacity = stats.RootBuckets * EntriesPerMapOfBucket if stats.Capacity != expectedCapacity { t.Fatalf("capacity was too large: %d, expected: %d", stats.Capacity, expectedCapacity) } @@ -739,7 +763,7 @@ func TestMapOfResize_CounterLenLimit(t *testing.T) { for i := 0; i < numEntries; i++ { m.Store("foo"+strconv.Itoa(i), "bar"+strconv.Itoa(i)) } - stats := CollectMapOfStats(m) + stats := m.Stats() if stats.Size != numEntries { t.Fatalf("size was too small: %d", stats.Size) } @@ -749,7 +773,7 @@ func TestMapOfResize_CounterLenLimit(t *testing.T) { } } -func parallelSeqTypedResizer(t *testing.T, m *MapOf[int, int], numEntries int, positive bool, cdone chan bool) { +func parallelSeqTypedResizer(m *MapOf[int, int], numEntries int, positive bool, cdone chan bool) { for i := 0; i < numEntries; i++ { if positive { m.Store(i, i) @@ -764,8 +788,8 @@ func TestMapOfParallelResize_GrowOnly(t *testing.T) { const numEntries = 100_000 m := NewMapOf[int, int]() cdone := make(chan bool) - go parallelSeqTypedResizer(t, m, numEntries, true, cdone) - go parallelSeqTypedResizer(t, m, numEntries, false, cdone) + go parallelSeqTypedResizer(m, numEntries, true, cdone) + go parallelSeqTypedResizer(m, numEntries, false, cdone) // Wait for the goroutines to finish. <-cdone <-cdone @@ -801,7 +825,7 @@ func parallelRandTypedResizer(t *testing.T, m *MapOf[string, int], numIters, num func TestMapOfParallelResize(t *testing.T) { const numIters = 1_000 - const numEntries = 2 * EntriesPerMapBucket * DefaultMinMapTableLen + const numEntries = 2 * EntriesPerMapOfBucket * DefaultMinMapTableLen m := NewMapOf[string, int]() cdone := make(chan bool) go parallelRandTypedResizer(t, m, numIters, numEntries, cdone) @@ -984,7 +1008,7 @@ func TestMapOfParallelStoresAndDeletes(t *testing.T) { } } -func parallelTypedComputer(t *testing.T, m *MapOf[uint64, uint64], numIters, numEntries int, cdone chan bool) { +func parallelTypedComputer(m *MapOf[uint64, uint64], numIters, numEntries int, cdone chan bool) { for i := 0; i < numIters; i++ { for j := 0; j < numEntries; j++ { m.Compute(uint64(j), func(oldValue uint64, loaded bool) (newValue uint64, delete bool) { @@ -1001,7 +1025,7 @@ func TestMapOfParallelComputes(t *testing.T) { m := NewMapOf[uint64, uint64]() cdone := make(chan bool) for i := 0; i < numWorkers; i++ { - go parallelTypedComputer(t, m, numIters, numWorkers, cdone) + go parallelTypedComputer(m, numIters, numWorkers, cdone) } // Wait for the goroutines to finish. for i := 0; i < numWorkers; i++ { @@ -1019,7 +1043,7 @@ func TestMapOfParallelComputes(t *testing.T) { } } -func parallelTypedRangeStorer(t *testing.T, m *MapOf[int, int], numEntries int, stopFlag *int64, cdone chan bool) { +func parallelTypedRangeStorer(m *MapOf[int, int], numEntries int, stopFlag *int64, cdone chan bool) { for { for i := 0; i < numEntries; i++ { m.Store(i, i) @@ -1031,7 +1055,7 @@ func parallelTypedRangeStorer(t *testing.T, m *MapOf[int, int], numEntries int, cdone <- true } -func parallelTypedRangeDeleter(t *testing.T, m *MapOf[int, int], numEntries int, stopFlag *int64, cdone chan bool) { +func parallelTypedRangeDeleter(m *MapOf[int, int], numEntries int, stopFlag *int64, cdone chan bool) { for { for i := 0; i < numEntries; i++ { m.Delete(i) @@ -1052,8 +1076,8 @@ func TestMapOfParallelRange(t *testing.T) { // Start goroutines that would be storing and deleting items in parallel. cdone := make(chan bool) stopFlag := int64(0) - go parallelTypedRangeStorer(t, m, numEntries, &stopFlag, cdone) - go parallelTypedRangeDeleter(t, m, numEntries, &stopFlag, cdone) + go parallelTypedRangeStorer(m, numEntries, &stopFlag, cdone) + go parallelTypedRangeDeleter(m, numEntries, &stopFlag, cdone) // Iterate the map and verify that no duplicate keys were met. met := make(map[int]int) m.Range(func(key int, value int) bool { @@ -1125,6 +1149,60 @@ func TestMapOfDoesNotLoseEntriesOnResize(t *testing.T) { } } +func TestMapOfStats(t *testing.T) { + m := NewMapOf[int, int]() + + stats := m.Stats() + if stats.RootBuckets != DefaultMinMapTableLen { + t.Fatalf("unexpected number of root buckets: %d", stats.RootBuckets) + } + if stats.TotalBuckets != stats.RootBuckets { + t.Fatalf("unexpected number of total buckets: %d", stats.TotalBuckets) + } + if stats.EmptyBuckets != stats.RootBuckets { + t.Fatalf("unexpected number of empty buckets: %d", stats.EmptyBuckets) + } + if stats.Capacity != EntriesPerMapOfBucket*DefaultMinMapTableLen { + t.Fatalf("unexpected capacity: %d", stats.Capacity) + } + if stats.Size != 0 { + t.Fatalf("unexpected size: %d", stats.Size) + } + if stats.Counter != 0 { + t.Fatalf("unexpected counter: %d", stats.Counter) + } + if stats.CounterLen != 8 { + t.Fatalf("unexpected counter length: %d", stats.CounterLen) + } + + for i := 0; i < 200; i++ { + m.Store(i, i) + } + + stats = m.Stats() + if stats.RootBuckets != 2*DefaultMinMapTableLen { + t.Fatalf("unexpected number of root buckets: %d", stats.RootBuckets) + } + if stats.TotalBuckets < stats.RootBuckets { + t.Fatalf("unexpected number of total buckets: %d", stats.TotalBuckets) + } + if stats.EmptyBuckets >= stats.RootBuckets { + t.Fatalf("unexpected number of empty buckets: %d", stats.EmptyBuckets) + } + if stats.Capacity < 2*EntriesPerMapOfBucket*DefaultMinMapTableLen { + t.Fatalf("unexpected capacity: %d", stats.Capacity) + } + if stats.Size != 200 { + t.Fatalf("unexpected size: %d", stats.Size) + } + if stats.Counter != 200 { + t.Fatalf("unexpected counter: %d", stats.Counter) + } + if stats.CounterLen != 8 { + t.Fatalf("unexpected counter length: %d", stats.CounterLen) + } +} + func BenchmarkMapOf_NoWarmUp(b *testing.B) { for _, bc := range benchmarkCases { if bc.readPercentage == 100 { @@ -1226,6 +1304,25 @@ func BenchmarkMapOfInt_WarmUp(b *testing.B) { } } +func BenchmarkMapOfInt_Murmur3Finalizer_WarmUp(b *testing.B) { + for _, bc := range benchmarkCases { + b.Run(bc.name, func(b *testing.B) { + m := NewMapOfWithHasher[int, int](murmur3Finalizer, WithPresize(benchmarkNumEntries)) + for i := 0; i < benchmarkNumEntries; i++ { + m.Store(i, i) + } + b.ResetTimer() + benchmarkMapOfIntKeys(b, func(k int) (int, bool) { + return m.Load(k) + }, func(k int, v int) { + m.Store(k, v) + }, func(k int) { + m.Delete(k) + }, bc.readPercentage) + }) + } +} + func BenchmarkIntMapStandard_NoWarmUp(b *testing.B) { for _, bc := range benchmarkCases { if bc.readPercentage == 100 { diff --git a/xsync/util.go b/xsync/util.go index 7368912..7692708 100644 --- a/xsync/util.go +++ b/xsync/util.go @@ -1,6 +1,7 @@ package xsync import ( + "math/bits" "runtime" _ "unsafe" ) @@ -44,3 +45,22 @@ func parallelism() uint32 { //go:noescape //go:linkname runtime_fastrand runtime.fastrand func runtime_fastrand() uint32 + +func broadcast(b uint8) uint64 { + return 0x101010101010101 * uint64(b) +} + +func firstMarkedByteIndex(w uint64) int { + return bits.TrailingZeros64(w) >> 3 +} + +// SWAR byte search: may produce false positives, e.g. for 0x0100, +// so make sure to double-check bytes found by this function. +func markZeroBytes(w uint64) uint64 { + return ((w - 0x0101010101010101) & (^w) & 0x8080808080808080) +} + +func setByte(w uint64, b uint8, idx int) uint64 { + shift := idx << 3 + return (w &^ (0xff << shift)) | (uint64(b) << shift) +} diff --git a/xsync/util_hash_mapof.go b/xsync/util_hash_mapof.go index 1e8bc85..c3adb9e 100644 --- a/xsync/util_hash_mapof.go +++ b/xsync/util_hash_mapof.go @@ -8,10 +8,10 @@ import ( "unsafe" ) -// makeHasher creates a fast hash function for the given comparable type. +// defaultHasher creates a fast hash function for the given comparable type. // The only limitation is that the type should not contain interfaces inside // based on runtime.typehash. -func makeHasher[T comparable]() func(T, uint64) uint64 { +func defaultHasher[T comparable]() func(T, uint64) uint64 { var zero T if reflect.TypeOf(&zero).Elem().Kind() == reflect.Interface { diff --git a/xsync/util_hash_mapof_test.go b/xsync/util_hash_mapof_test.go index d00add8..2629194 100644 --- a/xsync/util_hash_mapof_test.go +++ b/xsync/util_hash_mapof_test.go @@ -22,8 +22,8 @@ func TestMakeHashFunc(t *testing.T) { seed := MakeSeed() - hashString := MakeHasher[string]() - hashUser := MakeHasher[User]() + hashString := DefaultHasher[string]() + hashUser := DefaultHasher[User]() hashUserMap := makeMapHasher[User]() @@ -166,7 +166,7 @@ func BenchmarkMakeHashFunc(b *testing.B) { } func doBenchmarkMakeHashFunc[T comparable](b *testing.B, val T) { - hash := MakeHasher[T]() + hash := DefaultHasher[T]() hashNativeMap := makeMapHasher[T]() seed := MakeSeed() diff --git a/xsync/util_test.go b/xsync/util_test.go index 16a5143..fa583a6 100644 --- a/xsync/util_test.go +++ b/xsync/util_test.go @@ -2,6 +2,7 @@ package xsync_test import ( "math/rand" + "strconv" "testing" . "github.com/fufuok/utils/xsync" @@ -38,6 +39,235 @@ func TestFastrand(t *testing.T) { } } +func TestBroadcast(t *testing.T) { + testCases := []struct { + input uint8 + expected uint64 + }{ + { + input: 0, + expected: 0, + }, + { + input: 1, + expected: 0x0101010101010101, + }, + { + input: 2, + expected: 0x0202020202020202, + }, + { + input: 42, + expected: 0x2a2a2a2a2a2a2a2a, + }, + { + input: 127, + expected: 0x7f7f7f7f7f7f7f7f, + }, + { + input: 255, + expected: 0xffffffffffffffff, + }, + } + + for _, tc := range testCases { + t.Run(strconv.Itoa(int(tc.input)), func(t *testing.T) { + if Broadcast(tc.input) != tc.expected { + t.Errorf("unexpected result: %x", Broadcast(tc.input)) + } + }) + } +} + +func TestFirstMarkedByteIndex(t *testing.T) { + testCases := []struct { + input uint64 + expected int + }{ + { + input: 0, + expected: 8, + }, + { + input: 0x8080808080808080, + expected: 0, + }, + { + input: 0x0000000000000080, + expected: 0, + }, + { + input: 0x0000000000008000, + expected: 1, + }, + { + input: 0x0000000000800000, + expected: 2, + }, + { + input: 0x0000000080000000, + expected: 3, + }, + { + input: 0x0000008000000000, + expected: 4, + }, + { + input: 0x0000800000000000, + expected: 5, + }, + { + input: 0x0080000000000000, + expected: 6, + }, + { + input: 0x8000000000000000, + expected: 7, + }, + } + + for _, tc := range testCases { + t.Run(strconv.Itoa(int(tc.input)), func(t *testing.T) { + if FirstMarkedByteIndex(tc.input) != tc.expected { + t.Errorf("unexpected result: %x", FirstMarkedByteIndex(tc.input)) + } + }) + } +} + +func TestMarkZeroBytes(t *testing.T) { + testCases := []struct { + input uint64 + expected uint64 + }{ + { + input: 0xffffffffffffffff, + expected: 0, + }, + { + input: 0, + expected: 0x8080808080808080, + }, + { + input: 1, + expected: 0x8080808080808000, + }, + { + input: 1 << 9, + expected: 0x8080808080800080, + }, + { + input: 1 << 17, + expected: 0x8080808080008080, + }, + { + input: 1 << 25, + expected: 0x8080808000808080, + }, + { + input: 1 << 33, + expected: 0x8080800080808080, + }, + { + input: 1 << 41, + expected: 0x8080008080808080, + }, + { + input: 1 << 49, + expected: 0x8000808080808080, + }, + { + input: 1 << 57, + expected: 0x0080808080808080, + }, + // false positive + { + input: 0x0100, + expected: 0x8080808080808080, + }, + } + + for _, tc := range testCases { + t.Run(strconv.Itoa(int(tc.input)), func(t *testing.T) { + if MarkZeroBytes(tc.input) != tc.expected { + t.Errorf("unexpected result: %x", MarkZeroBytes(tc.input)) + } + }) + } +} + +func TestSetByte(t *testing.T) { + testCases := []struct { + word uint64 + b uint8 + idx int + expected uint64 + }{ + { + word: 0xffffffffffffffff, + b: 0, + idx: 0, + expected: 0xffffffffffffff00, + }, + { + word: 0xffffffffffffffff, + b: 1, + idx: 1, + expected: 0xffffffffffff01ff, + }, + { + word: 0xffffffffffffffff, + b: 2, + idx: 2, + expected: 0xffffffffff02ffff, + }, + { + word: 0xffffffffffffffff, + b: 3, + idx: 3, + expected: 0xffffffff03ffffff, + }, + { + word: 0xffffffffffffffff, + b: 4, + idx: 4, + expected: 0xffffff04ffffffff, + }, + { + word: 0xffffffffffffffff, + b: 5, + idx: 5, + expected: 0xffff05ffffffffff, + }, + { + word: 0xffffffffffffffff, + b: 6, + idx: 6, + expected: 0xff06ffffffffffff, + }, + { + word: 0xffffffffffffffff, + b: 7, + idx: 7, + expected: 0x07ffffffffffffff, + }, + { + word: 0, + b: 0xff, + idx: 7, + expected: 0xff00000000000000, + }, + } + + for _, tc := range testCases { + t.Run(strconv.Itoa(int(tc.word)), func(t *testing.T) { + if SetByte(tc.word, tc.b, tc.idx) != tc.expected { + t.Errorf("unexpected result: %x", SetByte(tc.word, tc.b, tc.idx)) + } + }) + } +} + func BenchmarkFastrand(b *testing.B) { for i := 0; i < b.N; i++ { _ = Fastrand()