-
Notifications
You must be signed in to change notification settings - Fork 82
/
simdb.hpp
2289 lines (1989 loc) · 87.3 KB
/
simdb.hpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
/*
Copyright 2017 Simeon Bassett
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
/*
SimDB
What it does:
| SimDB is a key value store that uses arbitrary byte data (of arbitrary length) as both the key and the value.
| It additionally uses shared memory, which allows processes to communicate with each other quickly.
| It is lock free and scales well with multiple threads writing, reading, and deleting concurrently.
How it works:
|-simdb:
| This contains the user facing interface. It contains the ConcurrentHash, ConcurentStore, and SharedMem classes as members.
| These data structures are made to be an interface over the contiguous memory given to them using a single address.
| They do not allocate any heap memory themselves, but do have a few class members that will be on the stack. At the time of this writing it is 176 bytes on the stack.
|-SharedMem:
| | Interface to OS specific shared memory functions. Also handles an initial alignment.
|-ConcurrentHash:
| | Hash map that uses atomic operations on an array of VerIdx structs.
| | It uses 64 bit atomic operations to compare-exchange one VerIdx at a time (VerIdx is two unsigned 32 bit integers, a version and an index).
| | This makes sure that reading, writing and deleting is lock free.
| | Writing is lock free since a VerIdx is already fully created and written to before putting it in the VerIdx array (m_vis) and the put operation here is a single 64 bit compare and swap.
| | Deletion is lock free since the index in VerIdx is only freed from the CncrLst after setting the VerIdx here to DELETED. Actually deletion means 1. setting the VerIdx to DELETED 2. decrementing the readers of the blocklist that idx points to 3. If the readers variable of that blocklist is decremented below its initial value then the thread that took it below its initial value is the one to free it.
| | Get is lock free since it can read an index from a VerIdx, increment readers, compare its key to the key in the list of blocks, read the value in the blocks to the output buffer and finally decrement the readers variable. Just like deletion, if a thread decrements readers below its initial value, it needs to free the block list. This means the last one out cleans up.
|-ConcurrentStore:
| | Keeps track of block lists.
| | This primarily uses an array of BlkLst structs (which are 24 bytes each).
| | The BlkLst lava_vec is used to make linked lists of block indices.
| | The idea of a block list ends up being a starting index (from the VerIdx struct in the concurrent hash). The BlkLst struct at the starting index contains an index of the next BlkLst struct and so on until reaching a BlkLst that has an index of LIST_END. This means that one array contains multiple linked lists (using indices and not pointers of course).
| | This exposes an alloc() function and a free() function.
| | alloc() gets the index of the next block from CncrLst (concurrent list).
| | The BlkLst struct keeps the total length and the key length / value offset since it does not have to be atomic and is only initialized and used when one thread allocates and only destroyed when one thread frees, just like the actual data blocks.
|-ConcurrentList:
| | The concurrent list is an array integers.
| | The number of elements (like all the arrays) is the number of blocks.
| | There is one integer per block with the integer at a given index representing the next slot in the list.
| | The end of the list will have value LIST_END. On initialization the array's values would be |1|2|3|4| ... LIST_END, which makes a list from the start to the end. This means s_lv[0] would return 1.
Terms:
|-Block List:
| A sequence of block indices. The entry in ConcurrentHash gives the position in the block list array where the list starts.
| The value at each index in the array contains the index of the next block.
| The list end is know when a special value of LIST_END is found as the value in the array.
|-Block List Version:
| This is a version number given to each block list on allocation (not each block).
| It is used to link a ConcurrentHash value to the block list.
| If the versions are the same, it is known that the block list at the index read from ConcurrentHash has not changed.
| This change could happen if:
| | 1. Thread ONE reads the entry in ConcurrentHash but has not accessed the block list index in the entry yet. Pretend that thread one stalls and nothing more happens until further down.
| | 2. Thread TWO has already allocated a block list and swaps its new entry for the old entry which is still carried by thread one.
| | 3. Thread TWO now must free the block list given by the old entry, which it does, because no thread is reading it since thread one is still stalled.
| | 4. Thread TWO allocates another block list, which ends up using the blocks it just deallocated.
| | 5. Thread ONE wakes up and reads from the block index it found in the ConcurrentHash entry, which is no longer the same and may not even be the head of the list.
| | If the index is used purely for matching the binary key, this wouldn't be a problem.
| | When the index is used to find a binary value however, this is a problem, since the length of a different value could be the same, and there would be no data to be able to tell that they are different.
How it achieves lock free concurrency:
| ConcurrentHash is treated as the authority of what is stored in the database.
| It has an array of VerIdx structs that can also be treated as 64 bit integers. Each is dealt with atomically.
| Its individual bits are used as a bitfied struct containing an index into ConcurrentStore's block list as well as the version number of that list.
| The core is m_vis, which is an array of VerIdx structs. The memory ordering is swapped on every other index in preparation for robin hood hashing techniques, so the actual memory layout (separated into 128 bit chunks) is |Index Version Version Index|Index Version Version Index|
|-Finding a matching index:
| | 1. Use the hash of the key bytes to jump to an index.
| | 2. Load the integer atomically from that index and treat it as a VerIdx struct.
| | 3. Use the index from that struct to read the bytes from the list of blocks in BlkLst.
| | 4. Increment the readers variable atomically, so that it won't be deleted before this thread is done with it.
| | 5. If there is a match, keep reading the list of blocks to fill the output buffer with the value section of the block list.
| | 6. After, decrement the readers variable atomically. If readers goes below its initial value, this thread will be the one to free the block list.
Other notables:
| All of the main classes have a static sizeBytes() function that takes in the same arguments as a constructor and return the number of bytes that it will need in the shared memory
| Classes have member variables that are used as interfaces to the shared memory denoted with s_ (s for shared)
| Normal member variables that are just data on the stack are denoted with m_ (m for member)
_________________
| Memory Layout |
-----------------
______________________________________________________________________________________________________________________
|Flags|BlockSize|BlockCount|ConcurrentHash|ConcurrentStore|ConcurentList|...BlockCount*BlockSize bytes for blocks....|
_____________________________/ \_______ \______________________________________________________
______|____________________________________ ____________|_________________________________________________ ________|___________________________________________
|size(bytes)|...array of VerIdx structs...| |Block List Version|size(bytes)|...array of BlkLst structs...| |size(bytes)|...array of unsigned 32 bit ints (u32)|
First 24 bytes (in 8 byte / unsigned 64 bit chunks):
____________________________
|Flags|BlockSize|BlockCount|
Flags: Right now holds count of the number of processes that have the db open. When the count goes to 0, the last process will delete the shared memory file.
BlockSize: The size in bytes of a block. A good default would be to set this to the common page size of 4096 bytes.
BlockCount: The number of blocks. This hash table array, block list array and concurrent list array will all be the same length. This multiplied by the BlockSize will give the total amount of bytes available for key and value data. More blocks will also mean the hash table will have less collisions as well as less contention between threads.
*/
// -todo: make a list cut itself off at the end by inserting LIST_END as the last value
// -todo: look into readers and matching - should two threads with the same key ever be able to double insert into the db? - MATCH_REMOVED was not re-looping on the current index
// -todo: make MATCH_REMOVED restart the current index
// -todo: make runIfMatch return a pair that includes the return value of the function it runs
// -todo: make sure version setting on free sets the version to 0 on the whole list
// -todo: make sure incReaders and decReaders are using explicit sequential consistency - already done
// -todo: make sure that if there is a version mismatch when comparing a block list, the block list version is still used when trying to swap the version+idx - would only the index actually be needed since a block list with incremented readers won't give up its index, thus it should be unique?
// -todo: take version argument out of incReaders and decReaders
// -todo: make a temporary thread_local variable for each thread to count how many allocations it has made and how many allocations it has freed - worked very well to narrow down the problem
// -todo: make sure that the VerIdx being returned from putHashed is actually what was atomically swapped out
// -todo: try putting LIST_END at the end of the the concurrent lists - not needed for now
// -todo: debug why 2 threads inserting the same key seems to need all blocks instead of just 3 * 2 * 2 (three blocks per key * two threads * two block lists per thread) - delete flag in block lists was not always set
// -todo: assert that the block list is never already deleted when being deleted from putHashed - that wasn't the problem
// -todo: check what happens when the same key but different versions are inserted - do two different versions end up in the DB? does one version end up undeletable ? - this was fixed by only comparing the key without the version
// -todo: check path of thread that deletes a key, make sure it replaces the index in the hash map - how do two conflicting indices in the hash map resolve? the thread that replaces needs to delete the old allocation using the version - is the version / deleted flag being changed atomically in the block list
// -todo: change the Match enum to be an bit bitfield with flags - not needed for now
// -todo: make simdb len() and get() ignore version numbers for match and only match keys
// todo: make sure get() only increments and decrements the first/key block in the block list
// todo: make simdb give a proper error if running out of space
// todo: make simdb expand when eighther out of space or initialized with a larger amount of space
// todo: make a get function that takes a key version struct
// todo: make a get function that returns a tbl if tbl.hpp is included
#ifdef _MSC_VER
#pragma once
#pragma warning(push, 0)
#endif
#ifndef __SIMDB_HEADER_GUARD__
#define __SIMDB_HEADER_GUARD__
// turn asserts on an off - not sure of the best way to handle this with gcc and clang yet
#ifdef _MSC_VER
#if !defined(_DEBUG)
#define NDEBUG
#endif
#endif
#if !defined(SECTION)
#define SECTION(_msvc_only_collapses_macros_with_arguments, ...)
#endif
// platform specific includes - mostly for shared memory mapping and auxillary functions like open, close and the windows equivilents
#if defined(_WIN32) // windows
#include <locale>
#include <codecvt>
#include <tchar.h>
//#ifdef UNICODE
// #undef UNICODE
//#endif
#define NOMINMAX
#define WIN32_LEAN_AND_MEAN
#include <windows.h>
#include <strsafe.h>
#ifdef MIN
#undef MIN
#endif
#ifdef MAX
#undef MAX
#endif
#ifdef _MSC_VER
#if !defined(_CRT_SECURE_NO_WARNINGS)
#define _CRT_SECURE_NO_WARNINGS
#endif
#if !defined(_SCL_SECURE_NO_WARNINGS)
#define _SCL_SECURE_NO_WARNINGS
#endif
#endif
#elif defined(__APPLE__) || defined(__MACH__) || defined(__unix__) || defined(__FreeBSD__) || defined(__linux__) // osx, linux and freebsd
// for mmap and munmap
// PROT_READ and PROT_WRITE to allow reading and writing but not executing of the mapped memory pages
// MAP_ANONYMOUS | MAP_SHARED for the anonymous shared memory we want
// mmap is system call 2 on osx, freebsd, and linux
// the apple docs for mmap say "BSD System Calls" so I guess they haven't changed them around
#include <sys/mman.h>
#include <sys/fcntl.h>
#include <sys/errno.h>
#include <sys/unistd.h>
#include <sys/file.h> // for flock (file lock)
#include <sys/stat.h>
#include <sys/param.h>
#include <unistd.h>
#include <dirent.h>
#include <errno.h>
#endif
#include <cstdint>
#include <cstring>
#include <atomic>
#include <mutex>
#include <memory>
#include <vector>
#include <string>
#include <unordered_set>
#include <set>
#include <algorithm>
#include <cassert>
// platform specific type definitions
#ifdef _WIN32 // these have to be outside the anonymous namespace
typedef void *HANDLE;
typedef HANDLE *PHANDLE;
typedef wchar_t WCHAR; // wc, 16-bit UNICODE character
typedef UCHAR BOOLEAN; // winnt
typedef unsigned long ULONG;
#endif
//#ifndef NDEBUG
thread_local int __simdb_allocs = 0;
thread_local int __simdb_deallocs = 0;
//#endif
namespace {
enum Match { MATCH_FALSE=0, MATCH_TRUE=1, MATCH_REMOVED = -1, MATCH_TRUE_WRONG_VERSION = -2 };
template<class T>
class lava_noop
{
void operator()(){}
};
inline uint64_t fnv_64a_buf(void const *const buf, uint64_t len) // sbassett - I know basically nothing about hash functions and there is likely a better one out there
{
uint64_t hval = 0xCBF29CE484222325;
uint8_t* bp = (uint8_t*)buf; // start of buffer
uint8_t* be = bp + len; // beyond end of buffer
while(bp < be){ // FNV-1a hash each octet of the buffer
hval ^= (uint64_t)*bp++; // xor the bottom with the current octet */
hval += (hval << 1) + (hval << 4) + (hval << 5) +
(hval << 7) + (hval << 8) + (hval << 40);
}
return hval;
}
inline void prefetch1(char const* const p)
{
#ifdef _MSC_VER // if msvc or intel compilers
_mm_prefetch(p, _MM_HINT_T1);
#elif defined(__GNUC__) || defined(__clang__)
__builtin_prefetch(p);
#else
#endif
}
#ifdef _WIN32
typedef struct _UNICODE_STRING {
USHORT Length;
USHORT MaximumLength;
#ifdef MIDL_PASS
[size_is(MaximumLength / 2), length_is((Length) / 2) ] USHORT * Buffer;
#else // MIDL_PASS
_Field_size_bytes_part_(MaximumLength, Length) PWCH Buffer;
#endif // MIDL_PASS
} UNICODE_STRING;
typedef UNICODE_STRING *PUNICODE_STRING;
typedef struct _OBJECT_ATTRIBUTES {
ULONG Length;
HANDLE RootDirectory;
PUNICODE_STRING ObjectName;
ULONG Attributes;
PVOID SecurityDescriptor; // Points to type SECURITY_DESCRIPTOR
PVOID SecurityQualityOfService; // Points to type SECURITY_QUALITY_OF_SERVICE
} OBJECT_ATTRIBUTES;
typedef OBJECT_ATTRIBUTES *POBJECT_ATTRIBUTES;
typedef long LONG;
typedef LONG NTSTATUS;
// the following is api poison, but seems to be the only way to list the global anonymous memory maps in windows
#define DIRECTORY_QUERY 0x0001
#define STATUS_SUCCESS ((NTSTATUS)0x00000000L) // ntsubauth
#define OBJ_CASE_INSENSITIVE 0x00000040L
#define STATUS_NO_MORE_FILES ((NTSTATUS)0x80000006L)
#define STATUS_NO_MORE_ENTRIES ((NTSTATUS)0x8000001AL)
typedef struct _IO_STATUS_BLOCK {
union {
NTSTATUS Status;
PVOID Pointer;
};
ULONG_PTR Information;
} IO_STATUS_BLOCK, *PIO_STATUS_BLOCK;
using NTOPENDIRECTORYOBJECT = NTSTATUS (WINAPI*)(
_Out_ PHANDLE DirectoryHandle,
_In_ ACCESS_MASK DesiredAccess,
_In_ POBJECT_ATTRIBUTES ObjectAttributes
);
using NTOPENFILE = NTSTATUS (WINAPI*)(
_Out_ PHANDLE FileHandle,
_In_ ACCESS_MASK DesiredAccess,
_In_ POBJECT_ATTRIBUTES ObjectAttributes,
_Out_ PIO_STATUS_BLOCK IoStatusBlock,
_In_ ULONG ShareAccess,
_In_ ULONG OpenOptions
);
using NTQUERYDIRECTORYOBJECT = NTSTATUS(WINAPI*)(
_In_ HANDLE DirectoryHandle,
_Out_opt_ PVOID Buffer,
_In_ ULONG Length,
_In_ BOOLEAN ReturnSingleEntry,
_In_ BOOLEAN RestartScan,
_Inout_ PULONG Context,
_Out_opt_ PULONG ReturnLength
);
using RTLINITUNICODESTRING = VOID(*)(
_Out_ PUNICODE_STRING DestinationString,
_In_opt_ PCWSTR SourceString
);
struct OBJECT_DIRECTORY_INFORMATION { UNICODE_STRING name; UNICODE_STRING type; };
//auto GetLastErrorStdStr() -> std::string
//{
// DWORD error = GetLastError();
// if (error)
// {
// LPVOID lpMsgBuf;
// DWORD bufLen = FormatMessage(
// FORMAT_MESSAGE_ALLOCATE_BUFFER |
// FORMAT_MESSAGE_FROM_SYSTEM |
// FORMAT_MESSAGE_IGNORE_INSERTS,
// NULL,
// error,
// MAKELANGID(LANG_NEUTRAL, SUBLANG_DEFAULT),
// (LPTSTR) &lpMsgBuf,
// 0, NULL );
// if (bufLen)
// {
// LPCSTR lpMsgStr = (LPCSTR)lpMsgBuf;
// std::string result(lpMsgStr, lpMsgStr+bufLen);
//
// LocalFree(lpMsgBuf);
//
// return result;
// }
// }
// return std::string();
//}
PVOID GetLibraryProcAddress(PSTR LibraryName, PSTR ProcName)
{
return GetProcAddress(GetModuleHandleA(LibraryName), ProcName);
}
int win_printf(const char * format, ...)
{
char szBuff[1024];
int retValue;
DWORD cbWritten;
va_list argptr;
va_start( argptr, format );
retValue = wvsprintfA( szBuff, format, argptr );
va_end( argptr );
WriteFile( GetStdHandle(STD_OUTPUT_HANDLE), szBuff, retValue,
&cbWritten, 0 );
return retValue;
}
#endif // end #ifdef _WIN32
}
#ifdef _WIN32
#pragma warning(pop)
#endif
enum class simdb_error {
NO_ERRORS=2,
DIR_NOT_FOUND,
DIR_ENTRY_ERROR,
COULD_NOT_OPEN_MAP_FILE,
COULD_NOT_MEMORY_MAP_FILE,
SHARED_MEMORY_ERROR,
FTRUNCATE_FAILURE,
FLOCK_FAILURE,
PATH_TOO_LONG
};
template<class T>
class lava_vec
{
public:
using u64 = uint64_t;
private:
void* p;
void set_sizeBytes(u64 sb){ ((u64*)p)[-1] = sb; } // an offset of -2 should be the first 8 bytes, which store the size in bytes of the whole memory span of this lava_vec
public:
static u64 sizeBytes(u64 count) // sizeBytes is meant to take the same arguments as a constructor and return the total number of bytes to hold the entire stucture given those arguments
{
return sizeof(u64) + count*sizeof(T);
}
lava_vec(){}
lava_vec(void* addr, u64 count, bool owner=true) :
p( ((u64*)addr) + 1 )
{
if(owner){
set_sizeBytes( lava_vec::sizeBytes(count) );
}
}
lava_vec(void* addr) : p( ((u64*)addr) + 2 ) {}
lava_vec(lava_vec const&) = delete;
void operator=(lava_vec const&) = delete;
lava_vec(lava_vec&& rval){ p=rval.p; rval.p=nullptr; }
~lava_vec(){}
T& operator[](u64 i){ return data()[i]; }
T* data(){ return (T*)p; }
u64 sizeBytes() const { return ((u64*)p)[0]; } // first 8 bytes should be the total size of the buffer in bytes
auto addr() const -> void*
{
return p;
}
};
class CncrLst
{
// Internally this is an array of indices that makes a linked list
// Externally indices can be gotten atomically and given back atomically
// | This is used to get free indices one at a time, and give back in-use indices one at a time
// Uses the first 8 bytes that would normally store sizeBytes as the 64 bits of memory for the Head structure
// Aligns the head on a 64 bytes boundary with the rest of the memory on a separate 64 byte boudary. This puts them on separate cache lines which should eliminate false sharing between cores when atomicallyaccessing the Head union (which will happen quite a bit)
public:
using u32 = uint32_t;
using u64 = uint64_t;
using au32 = volatile std::atomic<u32>;
using au64 = volatile std::atomic<u64>;
using ListVec = lava_vec<u32>;
union Head
{
struct { u32 ver; u32 idx; }; // ver is version, idx is index
u64 asInt;
};
static const u32 LIST_END = 0xFFFFFFFF;
static const u32 NXT_VER_SPECIAL = 0xFFFFFFFF;
//private:
ListVec s_lv;
volatile au64* s_h;
public:
static u64 sizeBytes(u32 size) { return ListVec::sizeBytes(size) + 128; } // an extra 128 bytes so that Head can be placed (why 128 bytes? so that the head can be aligned on its own cache line to avoid false sharing, since it is a potential bottleneck)
static u32 incVersion(u32 v) { return v==NXT_VER_SPECIAL? 1 : v+1; }
CncrLst(){}
CncrLst(void* addr, u32 size, bool owner=true) // this constructor is for when the memory is owned an needs to be initialized
{ // separate out initialization and let it be done explicitly in the simdb constructor?
u64 addrRem = (u64)addr % 64;
u64 alignAddr = (u64)addr + (64-addrRem);
assert( alignAddr % 64 == 0 );
s_h = (au64*)alignAddr;
u32* listAddr = (u32*)((u64)alignAddr+64);
new (&s_lv) ListVec(listAddr, size, owner);
if(owner){
for(u32 i=0; i<(size-1); ++i) s_lv[i] = i+1;
s_lv[size-1] = LIST_END;
((Head*)s_h)->idx = 0;
((Head*)s_h)->ver = 0;
}
}
bool headCmpEx(u64* expected, au64 desired)
{
using namespace std;
//return atomic_compare_exchange_strong_explicit(
// s_h, (volatile au64*)&expected, desired,
// memory_order_seq_cst, memory_order_seq_cst
// );
//return atomic_compare_exchange_strong(
// s_h, (volatile au64*)&expected, desired
//);
return atomic_compare_exchange_strong_explicit(
s_h, expected, desired,
memory_order_seq_cst, memory_order_seq_cst
);
}
u32 nxt() // moves forward in the list and return the previous index
{
Head curHead, nxtHead;
curHead.asInt = s_h->load();
do{
if(curHead.idx==LIST_END){
return LIST_END;
}
nxtHead.idx = s_lv[curHead.idx];
nxtHead.ver = curHead.ver==NXT_VER_SPECIAL? 1 : curHead.ver+1;
}while( !headCmpEx( &curHead.asInt, nxtHead.asInt) );
//}while( !headCmpEx(curHead.asInt, nxtHead.asInt) );
//}while( !s_h->compare_exchange_strong(curHead.asInt, nxtHead.asInt) );
return curHead.idx;
}
u32 nxt(u32 prev) // moves forward in the list and return the previous index
{
using namespace std;
Head curHead, nxtHead, prevHead;
curHead.asInt = s_h->load();
do{
if(curHead.idx==LIST_END){
return LIST_END;
}
prevHead = curHead;
nxtHead.idx = s_lv[curHead.idx];
nxtHead.ver = curHead.ver==NXT_VER_SPECIAL? 1 : curHead.ver+1;
}while( !headCmpEx( &curHead.asInt, nxtHead.asInt) );
//}while( !headCmpEx(curHead.asInt, nxtHead.asInt) );
//}while( !s_h->compare_exchange_strong(curHead.asInt, nxtHead.asInt) );
//s_lv[prev] = curHead.idx;
atomic_store( (au32*)&s_lv[prev], curHead.idx);
return curHead.idx;
}
u32 free(u32 idx) // not thread safe when reading from the list, but it doesn't matter because you shouldn't be reading while freeing anyway, since the CncrHsh will already have the index taken out and the free will only be triggered after the last reader has read from it
{
Head curHead, nxtHead; u32 retIdx;
curHead.asInt = s_h->load();
do{
retIdx = s_lv[idx] = curHead.idx;
nxtHead.idx = idx;
nxtHead.ver = curHead.ver + 1;
}while( !headCmpEx( &curHead.asInt, nxtHead.asInt) );
//}while( !headCmpEx(curHead.asInt, nxtHead.asInt) );
//}while( !s_h->compare_exchange_strong(curHead.asInt, nxtHead.asInt) );
return retIdx;
}
u32 free(u32 st, u32 en) // not thread safe when reading from the list, but it doesn't matter because you shouldn't be reading while freeing anyway, since the CncrHsh will already have the index taken out and the free will only be triggered after the last reader has read from it
{
using namespace std;
Head curHead, nxtHead; u32 retIdx;
curHead.asInt = s_h->load();
do{
//retIdx = s_lv[en] = curHead.idx;
retIdx = curHead.idx;
atomic_store_explicit( (au32*)&(s_lv[en]), curHead.idx, memory_order_seq_cst);
//atomic_store( (au32*)&(s_lv[en]), curHead.idx);
nxtHead.idx = st;
nxtHead.ver = curHead.ver + 1;
}while( !headCmpEx( &curHead.asInt, nxtHead.asInt) );
//}while( !headCmpEx(curHead.asInt, nxtHead.asInt) );
//}while( !s_h->compare_exchange_strong(curHead.asInt, nxtHead.asInt) );
return retIdx;
}
u32 alloc(u32 count)
{
u32 st = nxt();
u32 cur = st;
if(st == LIST_END) return LIST_END;
else --count;
while( count > 0 ){
u32 nxtIdx = nxt(cur);
if(nxtIdx == LIST_END){
free(st,cur);
return LIST_END;
}
cur = nxtIdx;
--count;
}
//s_lv[cur] = LIST_END;
return st;
}
auto count() const -> u32 { return ((Head*)s_h)->ver; }
auto idx() const -> u32
{
Head h;
h.asInt = s_h->load();
return h.idx;
}
auto list() -> ListVec const* { return &s_lv; } // not thread safe
u32 lnkCnt() // not thread safe
{
u32 cnt = 0;
u32 curIdx = idx();
while( curIdx != LIST_END ){
curIdx = s_lv[curIdx];
++cnt;
}
return cnt;
}
auto head() -> Head* { return (Head*)s_h; }
};
class CncrStr // CncrStr is Concurrent Store
{
public:
using u8 = uint8_t;
using u32 = uint32_t;
using i32 = int32_t;
using u64 = uint64_t;
using i64 = int64_t;
using au32 = std::atomic<u32>;
using au64 = std::atomic<u64>;
union VerIdx
{
struct { u32 idx; u32 version; };
u64 asInt;
VerIdx(){}
VerIdx(u32 _idx, u32 _version) : idx(_idx), version(_version) {}
};
union KeyReaders
{
struct{ u32 isKey : 1; u32 isDeleted : 1; i32 readers : 30; };
u32 asInt;
};
struct BlkLst // 24 bytes total
{
union{
KeyReaders kr;
struct{ u32 isKey : 1; u32 isDeleted : 1; i32 readers : 30; };
}; // 4 bytes - kr is key readers
u32 idx, version, len, klen, hash; // 20 bytes
BlkLst() : isKey(0), isDeleted(0), readers(0), idx(0), version(0), len(0), klen(0), hash(0) {}
BlkLst(bool _isKey, i32 _readers, u32 _idx, u32 _version, u32 _len=0, u32 _klen=0, u32 _hash=0) :
isKey(_isKey),
isDeleted(0),
readers(_readers),
idx(_idx),
version(_version),
hash(_hash)
{
len = _len;
klen = _klen;
}
};
struct BlkCnt { u32 end : 1; u32 cnt : 31; }; // this is returned from alloc() and may not be neccesary - it is the number of blocks allocated and if the end was reached
using ai32 = std::atomic<i32>;
using BlockLists = lava_vec<BlkLst>; // only the indices returned from the concurrent list are altered, and only one thread will deal with any single index at a time
static const u32 LIST_END = CncrLst::LIST_END;
static VerIdx List_End()
{
VerIdx vi;
vi.idx = CncrLst::LIST_END;
vi.version = 0;
return vi;
}
static bool IsListEnd(VerIdx vi)
{
static const VerIdx empty = List_End();
return empty.asInt == vi.asInt;
}
bool cmpEx(au32* val, u32* expected, u32 desired) const
{
using namespace std;
return atomic_compare_exchange_strong_explicit(
val, expected, desired,
memory_order_seq_cst, memory_order_seq_cst
);
}
BlkLst incReaders(u32 blkIdx) const //u32 version) const // BI is Block Index increment the readers by one and return the previous kv from the successful swap
{
using namespace std;
KeyReaders cur, nxt;
BlkLst* bl = &s_bls[blkIdx];
au32* areaders = (au32*)&(bl->kr);
cur.asInt = atomic_load_explicit(areaders, memory_order_seq_cst);
do{
if(cur.readers<0 || cur.isDeleted){ return BlkLst(); }
nxt = cur;
nxt.readers += 1;
}while( !cmpEx(areaders, &cur.asInt, nxt.asInt) );
return *bl; // after readers has been incremented this block list entry is not going away. The only thing that would change would be the readers and that doesn't matter to the calling function.
//cur.asInt = areaders->load();
//
//if(bl->version!=version || cur.readers<0 || cur.isDeleted){ return BlkLst(); }
//
//}while( !areaders->compare_exchange_strong(cur.asInt, nxt.asInt) );
}
//bool decReadersOrDel(u32 blkIdx, u32 version, bool del=false) const // BI is Block Index increment the readers by one and return the previous kv from the successful swap
bool decReadersOrDel(u32 blkIdx, bool del=false) const // BI is Block Index increment the readers by one and return the previous kv from the successful swap
{
using namespace std;
KeyReaders cur, nxt; bool doDelete=false;
BlkLst* bl = &s_bls[blkIdx];
au32* areaders = (au32*)&(bl->kr);
cur.asInt = atomic_load_explicit(areaders, memory_order_seq_cst);
do{
doDelete = false;
nxt = cur;
if(del){
if(cur.isDeleted){ return true; }
if(cur.readers==0){
doDelete = true;
}
nxt.isDeleted = true;
}else{
if(cur.readers==1 && cur.isDeleted){ doDelete=true; }
nxt.readers -= 1;
}
}while( !cmpEx(areaders, &cur.asInt, nxt.asInt) );
if(doDelete){ doFree(blkIdx); return false; }
return true;
//cur.asInt = areaders->load();
//if(bl->version!=version){ return false; }
//
//if(cur.readers==0 && !cur.isDeleted){ doDelete=true; }
//
//}while( !areaders->compare_exchange_strong(cur.asInt, nxt.asInt) );
//
//return cur.isDeleted;
}
//private:
// s_ variables are used to indicate data structures and memory that is in the shared memory, usually just a pointer on the stack and of course, nothing on the heap
// The order of the shared memory as it is in the memory mapped file: Version, CncrLst, BlockLists, Blocks
mutable CncrLst s_cl; // flat data structure - pointer to memory
mutable BlockLists s_bls; // flat data structure - pointer to memory - bl is Block Lists
void* s_blksAddr; // points to the block space in the shared memory
au64* s_version; // pointer to the shared version number
u32 m_blockSize;
u64 m_szBytes;
VerIdx nxtBlock(u32 blkIdx) const
{
BlkLst bl = s_bls[blkIdx];
prefetch1( (char const* const)blockFreePtr(bl.idx) );
return VerIdx(bl.idx, bl.version);
}
u32 blockFreeSize() const { return m_blockSize; }
u8* blockFreePtr(u32 blkIdx) const { return ((u8*)s_blksAddr) + blkIdx*m_blockSize; }
u8* blkPtr(u32 blkIdx) const { return ((u8*)s_blksAddr) + blkIdx*m_blockSize; }
u32 blocksNeeded(u32 len, u32* out_rem=nullptr)
{
u32 freeSz = blockFreeSize();
u32 byteRem = len % freeSz;
u32 blocks = len / freeSz + (byteRem? 1 : 0); // should never be 0 if blocksize is greater than the size of the index type
if(out_rem) *out_rem = byteRem;
return blocks;
}
u32 findEndSetVersion(u32 blkIdx, u32 version) const // find the last BlkLst slot in the linked list of blocks to free
{
u32 cur=blkIdx, prev=blkIdx; // the first index will have its version set twice
while(cur != LIST_END){
s_bls[cur].version = version;
prev = cur;
cur = s_bls[cur].idx;
}
return prev;
//assert(s_cl.s_lv[cur] == s_bls[cur].idx);
//
//sim_assert(s_cl.s_lv[cur]==s_bls[cur].idx, s_cl.s_lv[cur], s_bls[cur].idx );
//
//auto lvIdx = s_cl.s_lv[cur];
//auto blsIdx = s_bls[cur].idx;
//sim_assert(lvIdx == blsIdx, lvIdx, blsIdx );
//
//sim_assert(s_cl.s_lv[prev]==s_bls[prev].idx, s_cl.s_lv[prev], s_bls[prev].idx );
//
//return cur;
}
void doFree(u32 blkIdx) const // frees a list/chain of blocks - don't need to zero out the memory of the blocks or reset any of the BlkLsts' variables since they will be re-initialized anyway
{
using namespace std;
u32 listEnd = findEndSetVersion(blkIdx, 0);
//sim_assert(s_lv[en], s_lv[en] == LIST_END, en);
//assert(s_cl.s_lv[listEnd] == LIST_END);
s_cl.free(blkIdx, listEnd);
__simdb_deallocs += 1;
// doesn't work - LIST_END only works for allocation
//u32 cur = blkIdx;
//while(cur != LIST_END)
// cur = s_cl.free(cur);
}
u32 writeBlock(u32 blkIdx, void const* const bytes, u32 len=0, u32 ofst=0) // don't need to increment readers since write should be done before the block is exposed to any other threads
{
u32 blkFree = blockFreeSize();
u8* p = blockFreePtr(blkIdx);
u32 cpyLen = len==0? blkFree : len; // if next is negative, then it will be the length of the bytes in that block
p += ofst;
memcpy(p, bytes, cpyLen);
return cpyLen;
}
u32 readBlock(u32 blkIdx, u32 version, void *const bytes, u32 ofst=0, u32 len=0) const
{
//BlkLst bl = incReaders(blkIdx, version);
BlkLst bl = incReaders(blkIdx);
if(bl.version==0){ return 0; }
u32 blkFree = blockFreeSize();
u8* p = blockFreePtr(blkIdx);
u32 cpyLen = len==0? blkFree-ofst : len;
memcpy(bytes, p+ofst, cpyLen);
decReadersOrDel(blkIdx);
//decReadersOrDel(blkIdx, version);
return cpyLen;
}
public:
static u64 BlockListsOfst(){ return sizeof(u64); }
static u64 CListOfst(u32 blockCount){ return BlockListsOfst() + BlockLists::sizeBytes(blockCount); } // BlockLists::sizeBytes ends up being sizeof(BlkLst)*blockCount + 2 u64 variables
static u64 BlksOfst(u32 blockCount){ return CListOfst(blockCount) + CncrLst::sizeBytes(blockCount); }
static u64 sizeBytes(u32 blockSize, u32 blockCount){ return BlksOfst(blockCount) + blockSize*blockCount; }
CncrStr(){}
CncrStr(void* addr, u32 blockSize, u32 blockCount, bool owner=true) :
s_cl( (u8*)addr + CListOfst(blockCount), blockCount, owner),
s_bls( (u8*)addr + BlockListsOfst(), blockCount, owner),
s_blksAddr( (u8*)addr + BlksOfst(blockCount) ),
s_version( (au64*)addr ),
m_blockSize(blockSize),
m_szBytes( *((u64*)addr) )
{
if(owner){
for(u32 i=0; i<blockCount; ++i){ s_bls[i] = BlkLst(); }
s_version->store(1); // todo: what is this version for if CncrLst already has a version?
}
assert(blockSize > sizeof(i32));
}
auto alloc(u32 size, u32 klen, u32 hash, BlkCnt* out_blocks=nullptr) -> VerIdx
{
u32 byteRem = 0;
u32 blocks = blocksNeeded(size, &byteRem);
u32 st = s_cl.alloc(blocks);
SECTION(handle allocation errors from the concurrent list){
if(st==LIST_END){
if(out_blocks){ *out_blocks = {true, 0} ; }
return List_End();
}
}
u32 ver = (u32)s_version->fetch_add(1);
u32 cur=st, cnt=0;
SECTION(loop for the number of blocks needed and get new block and link it to the list)
{
for(u32 i=0; i<blocks-1; ++i, ++cnt){
u32 nxt = s_cl.s_lv[cur];
s_bls[cur] = BlkLst(false, 0, nxt, ver, size);
cur = nxt;
}
}
SECTION(add the last index into the list, set out_blocks and return the start index with its version)
{
if(out_blocks){
out_blocks->end = s_cl.s_lv[cur] == LIST_END;
out_blocks->cnt = cnt;
}
s_bls[cur] = BlkLst(false,0,LIST_END,ver,size,0,0); // if there is only one block needed, cur and st could be the same
auto b = s_bls[st]; // debugging
s_bls[st].isKey = true;
s_bls[st].hash = hash;
s_bls[st].len = size;
s_bls[st].klen = klen;
s_bls[st].isDeleted = false;
__simdb_allocs += 1;
VerIdx vi(st, ver);
return vi;
}
}
bool free(u32 blkIdx, u32 version) // doesn't always free a list/chain of blocks - it decrements the readers and when the readers gets below the value that it started at, only then it is deleted (by the first thread to take it below the starting number)
{
//return decReadersOrDel(blkIdx, version, true);
return decReadersOrDel(blkIdx, true);
}
void put(u32 blkIdx, void const *const kbytes, u32 klen, void const *const vbytes, u32 vlen) // don't need version because this will only be used after allocating and therefore will only be seen by one thread until it is inserted into the ConcurrentHash
{
using namespace std;
u8* b = (u8*)kbytes;
bool kjagged = (klen % blockFreeSize()) != 0;
u32 kblocks = kjagged? blocksNeeded(klen)-1 : blocksNeeded(klen);
u32 remklen = klen - (kblocks*blockFreeSize());
u32 fillvlen = min(vlen, blockFreeSize()-remklen);
u32 tailvlen = vlen-fillvlen;
bool vjagged = (tailvlen % blockFreeSize()) != 0;
u32 vblocks = vjagged? blocksNeeded(tailvlen)-1 : blocksNeeded(tailvlen);
u32 remvlen = max<u32>(0, tailvlen - (vblocks*blockFreeSize()) );
u32 cur = blkIdx;
for(u32 i=0; i<kblocks; ++i){
b += writeBlock(cur, b);
cur = nxtBlock(cur).idx;
}
if(kjagged){
writeBlock(cur, b, remklen);
b = (u8*)vbytes;
b += writeBlock(cur, b, fillvlen, remklen);
cur = nxtBlock(cur).idx;
}
for(u32 i=0; i<vblocks; ++i){
b += writeBlock(cur, b);
cur = nxtBlock(cur).idx;
}
if(vjagged && remvlen>0){
b += writeBlock(cur, b, remvlen);
}
}
u32 get(u32 blkIdx, u32 version, void *const bytes, u32 maxlen, u32* out_readlen=nullptr) const
{
using namespace std;
if(blkIdx == LIST_END){ return 0; }
//BlkLst bl = incReaders(blkIdx, version);
BlkLst bl = incReaders(blkIdx);
u32 vlen = bl.len-bl.klen;
if(bl.len==0 || vlen>maxlen ) return 0;
auto kdiv = div((i64)bl.klen, (i64)blockFreeSize());
auto kblks = kdiv.quot;
u32 krem = (u32)kdiv.rem;
auto vrdLen = 0;
u32 len = 0;
u32 rdLen = 0;
u8* b = (u8*)bytes;
i32 cur = blkIdx;
VerIdx nxt;
for(int i=0; i<kblks; ++i){
nxt = nxtBlock(cur); if(nxt.version!=version){ goto read_failure; }
cur = nxt.idx;
}
vrdLen = min<u32>(blockFreeSize()-krem, vlen);
rdLen = (u32)readBlock(cur, version, b, krem, vrdLen);
b += rdLen;
len += rdLen;
nxt = nxtBlock(cur); if(nxt.version!=version){ goto read_failure; }
while(len<maxlen && nxt.idx!=LIST_END && nxt.version==version)
{
vrdLen = min<u32>(blockFreeSize(), maxlen-len);
cur = nxt.idx;
rdLen = readBlock(cur, version, b, 0, vrdLen); if(rdLen==0) break; // rdLen is read length
b += rdLen;
len += rdLen;
nxt = nxtBlock(cur);
}
if(out_readlen){ *out_readlen = len; }
read_failure: