diff --git a/entitybatch/db_access.go b/entitybatch/db_access.go new file mode 100644 index 0000000..b8dc118 --- /dev/null +++ b/entitybatch/db_access.go @@ -0,0 +1,220 @@ +package db + + + +import ( + "errors" + "fmt" + "github.com/gocql/gocql" + "github.com/scylladb/gocqlx/v2/entitybatch/sample" + + "github.com/scylladb/gocqlx/v2" + "github.com/scylladb/gocqlx/v2/qb" + "github.com/scylladb/gocqlx/v2/table" +) + +type operationName string // The operationname should be literal the public db operation function name, such as "Create", "Update",... +type operation struct { + stmt string + names []string +} + +type ExecuteStatement interface { + ExecuteStatement(stmt string) error +} + +// Encapsulate batch operations(create, update, delete) of multiple entities/single entity. +type DbAccessBatch interface { + Tables() []*table.Table + Create(entities ...interface{}) error + Update(entities ...interface{}) error + Delete(entities ...interface{}) error + ExecuteStatement +} + +// In order to enable find_by, besides the main entity table, we also need to create some find_by tables. +// Encapsulate batch operations of a single entity and other read/find_by requests. +type DbAccessSingleEntity interface { + DbAccessBatch + UpdateContainsSubTablePrimaryKey(entityCre interface{}) error + Read(entity interface{}) (interface{}, error) + FindByPartKey(tbName string, entity interface{}) (interface{}, error) + FindByPartKeyAndSortKey(tbName string, numSortCols int, entity interface{}) (interface{}, error) + SelectAll() (interface{}, error) +} + +// Encapsulate batch operations of multiple entities. +// For read/find_by requests, you need to implement them manually. +type DbAccessBatchMultiEntities interface { + DbAccessBatch +} + +type GenerateBatch func(entity ...interface{}) (batch interface{}) // entity and batch are pointers + +type dbAccessBatch struct { + Cql_Session *gocql.Session + tables []*table.Table + prebuiltOperations map[operationName]operation + generateBatch GenerateBatch // entity and batch are pointers +} + +func (da *dbAccessBatch) setupTables(table_metadatas []table.Metadata) { + da.tables = make([]*table.Table, len(table_metadatas)) + for i, tm := range table_metadatas { + da.tables[i] = table.New(table.Metadata{ + Name: tm.Name, + Columns: tm.Columns, + PartKey: tm.PartKey, + SortKey: tm.SortKey, + }) + } +} + +// Prebuild CRUD operations. +func (da *dbAccessBatch) preBuildOperations() { + da.prebuiltOperations = make(map[operationName]operation) + + // Create entities + { + batchBuilder := qb.Batch() + for _, t := range da.tables { + stmt, names := t.Insert() + batchBuilder.AddStmtWithPrefix(t.Name(), stmt, names) + } + stmt, names := batchBuilder.ToCql() + da.prebuiltOperations["Create"] = operation{stmt, names} + } + + // Update entities + { + batchBuilder := qb.Batch() + for _, t := range da.tables { + stmt, names := t.Update(da.getFiledsForUpdateBatch(t)...) + batchBuilder.AddStmtWithPrefix(t.Metadata().Name, stmt, names) + } + stmt, names := batchBuilder.ToCql() + da.prebuiltOperations["Update"] = operation{stmt, names} + } + + // Delete entities + { + batchBuilder := qb.Batch() + for _, t := range da.tables { + stmt, names := t.Delete() + batchBuilder.AddStmtWithPrefix(t.Metadata().Name, stmt, names) + } + stmt, names := batchBuilder.ToCql() + da.prebuiltOperations["Delete"] = operation{stmt, names} + } +} + +func (da *dbAccessBatch) getFiledsForUpdateBatch(t *table.Table) []string { + columns := t.Metadata().Columns + // primary columns (partition columns and sort columns) are not updatable. + primarykeys := make(map[string]struct{}) + for _, partKey := range t.Metadata().PartKey { + primarykeys[partKey] = struct{}{} + } + + for _, sortkey := range t.Metadata().SortKey { + primarykeys[sortkey] = struct{}{} + } + + var result []string + for _, col := range columns { + if _, exist := primarykeys[col]; !exist { + result = append(result, col) + } + } + return result +} + +func getPrimaryKeyCmp(table *table.Table) []qb.Cmp { + if table == nil { + return nil + } + PartKey := table.Metadata().PartKey + SortKey := table.Metadata().SortKey + + primaryKeyCmp := make([]qb.Cmp, 0, len(PartKey)+len(SortKey)) + for _, k := range PartKey { + primaryKeyCmp = append(primaryKeyCmp, qb.Eq(k)) + } + for _, k := range SortKey { + primaryKeyCmp = append(primaryKeyCmp, qb.Eq(k)) + } + return primaryKeyCmp +} + +func (da *dbAccessBatch) Tables() []*table.Table { + return da.tables +} + +// entity should be a pointer. +func (da *dbAccessBatch) Create(entities ...interface{}) error { + operation, ok := da.prebuiltOperations["Create"] + if !ok { + return errors.New(fmt.Sprintf("The operation %s is not supported", "Create")) + } + stmt, names := operation.stmt, operation.names + + batch := da.generateBatch(entities...) + q := gocqlx.Query(sample.Cql_Session.Query(stmt), names).BindStruct(batch) + cql := q.String() + + if err := q.ExecRelease(); err != nil { + errMsg := fmt.Sprintf("Query: %s, \nError: %s", cql, err.Error()) + return errors.New(errMsg) + } + return nil +} + +// entity should be a pointer. +// The find_by columns are filtered out because they are the partition key of the find_by table. +// If you want to update find_by columns, please use UpdateFindByKeys. +func (da *dbAccessBatch) Update(entities ...interface{}) error { + operation, ok := da.prebuiltOperations["Update"] + if !ok { + return errors.New(fmt.Sprintf("The operation %s is not supported", "Update")) + } + stmt, names := operation.stmt, operation.names + + batch := da.generateBatch(entities...) + q := gocqlx.Query(sample.Cql_Session.Query(stmt), names).BindStruct(batch) + cql := q.String() + + if err := q.ExecRelease(); err != nil { + errMsg := fmt.Sprintf("Query: %s, \nError: %s", cql, err.Error()) + return errors.New(errMsg) + } + return nil +} + +// entity should be a pointer. +func (da *dbAccessBatch) Delete(entities ...interface{}) error { + operation, ok := da.prebuiltOperations["Delete"] + if !ok { + return errors.New(fmt.Sprintf("The operation %s is not supported", "Delete")) + } + stmt, names := operation.stmt, operation.names + + batch := da.generateBatch(entities...) + q := gocqlx.Query(sample.Cql_Session.Query(stmt), names).BindStruct(batch) + cql := q.String() + + if err := q.ExecRelease(); err != nil { + errMsg := fmt.Sprintf("Query: %s, \nError: %s", cql, err.Error()) + return errors.New(errMsg) + } + return nil +} + +func (da *dbAccessBatch) ExecuteStatement(stmt string) error { + if len(stmt) == 0 { + return errors.New("statement is required") + } + + q := sample.Cql_Session.Query(stmt) + defer q.Release() + return q.Exec() +} diff --git a/entitybatch/db_access_multiple_entities.go b/entitybatch/db_access_multiple_entities.go new file mode 100644 index 0000000..3623502 --- /dev/null +++ b/entitybatch/db_access_multiple_entities.go @@ -0,0 +1,20 @@ +package db + +import "github.com/scylladb/gocqlx/v2/table" + +type dbAccessMultipleEntities struct { + dbAccessBatch +} + +func CreateDbAccessMultipleEntities(table_metadatas []table.Metadata, generateBatch GenerateBatch) DbAccessBatchMultiEntities { + if len(table_metadatas) == 0 { + panic("Empty table_metadatas!") + } + da := &dbAccessMultipleEntities{} + da.setupTables(table_metadatas) + da.preBuildOperations() + + da.generateBatch = generateBatch + + return da +} diff --git a/entitybatch/db_access_single_entity.go b/entitybatch/db_access_single_entity.go new file mode 100644 index 0000000..49ec490 --- /dev/null +++ b/entitybatch/db_access_single_entity.go @@ -0,0 +1,250 @@ +package db + +import ( + "encoding/json" + "errors" + "fmt" + "github.com/scylladb/gocqlx/v2" + "github.com/scylladb/gocqlx/v2/entitybatch/sample" + "github.com/scylladb/gocqlx/v2/qb" + "github.com/scylladb/gocqlx/v2/table" + "strconv" + "time" +) + +type GenerateEntity func() (entity interface{}) // entity should be a pointer +type GenerateEntities func() (entities interface{}) // entityies should be a pointer to a slice. The elements in the slice can a pointer, or not. +type GenerateBatch4CreateAndDelete func(entityDel, entityCre interface{}) (batch interface{}) // entityDel, entityCre and batch are pointers + +type dbAccessSingleEntity struct { + dbAccessBatch + generateEntity GenerateEntity // entity should be a pointer + generateEntities GenerateEntities // entity should be a pointer + generateBatch4CreateAndDelete GenerateBatch4CreateAndDelete // entityDel, entityCre and batch are pointers +} + +// tables[0] must be the base table. Other elements are the so called "find_by"/sub tables. +func CreateDbAccessSingleEntity(table_metadatas []table.Metadata, generateEntity GenerateEntity, generateEntities GenerateEntities, generateBatch GenerateBatch, generateBatch4CreateAndDelete GenerateBatch4CreateAndDelete) DbAccessSingleEntity { + if len(table_metadatas) == 0 { + panic("Empty table_metadatas!") + } + da := &dbAccessSingleEntity{} + da.setupTables(table_metadatas) + da.preBuildOperations() + + da.generateEntity = generateEntity + da.generateEntities = generateEntities + da.generateBatch = generateBatch + da.generateBatch4CreateAndDelete = generateBatch4CreateAndDelete + + return da +} + +// Prebuild CRUD operations. +func (da *dbAccessSingleEntity) preBuildOperations() { + da.dbAccessBatch.preBuildOperations() + + // We need to reset update operation using dbAccessSingleEntity's own implementation. + // We filter out the "find_by" columns because they are the partition key of the find_by table. + // If you want to update the "find_by" columns, please use UpdateFindByKeys. + { + batchBuilder := qb.Batch() + for _, t := range da.tables { + stmt, names := t.Update(da.getFiledsForUpdateBatch(t.Metadata().Columns)...) + batchBuilder.AddStmtWithPrefix(t.Metadata().Name, stmt, names) + } + stmt, names := batchBuilder.ToCql() + da.prebuiltOperations["Update"] = operation{stmt, names} + } + + // Read the entity by primary key columns from the base table. + { + t := da.tables[0] + stmt, names := t.Get(t.Metadata().Columns...) + da.prebuiltOperations["Read"] = operation{stmt, names} + } + + //Find entities from the find_by tables by partition keys. + { + for _, t := range da.tables { + stmt, names := t.Select(t.Metadata().Columns...) + da.prebuiltOperations[generateFindByPartKeyOperationName(t.Metadata().Name)] = operation{stmt, names} + } + } + + //Find entities from the find_by tables by partition keys and sort keys. + { + for _, t := range da.tables { + for i := 0; i < len(t.Metadata().SortKey); i++ { + stmt, names := qb.Select(t.Metadata().Name). + Columns(t.Metadata().Columns...). + Where(getPrimaryKeyCmp(t)[0 : i+1]...). + ToCql() + da.prebuiltOperations[generateFindByPartKeyAndSortKeyOperationName(t.Metadata().Name, i)] = operation{stmt, names} + } + } + } +} + +// In order to update the partition key of the "find_by" tables, we need to delete the existing entity and create a new one in a batch cql request. +// The execution order of the statements in a batch doesn't depends on their order in the batch cql statements. +// So we need to add time stamps to the statements in the batch cql. +// DeleteAndCreateOperation cannot be pre-built because we need to use the newest time stamp. +func (da *dbAccessSingleEntity) generateDeleteAndCreateOperation() operation { + // deleteAndCreate + { + batchBuilder := qb.Batch() + + // Delete the existing product + // We need the timestamp to order the cql statements execution in the batch. + for _, t := range da.tables { + primaryKeyCmp := getPrimaryKeyCmp(t) + stmt, names := qb.Delete(t.Metadata().Name).Columns().Where(primaryKeyCmp...).Timestamp(time.Now()).ToCql() + batchBuilder.AddStmtWithPrefix(t.Metadata().Name+"_old", stmt, names) + } + + // Create the updated(new) product + for _, t := range da.tables { + stmt, names := qb.Insert(t.Metadata().Name).Columns(t.Metadata().Columns...).Timestamp(time.Now()).ToCql() + batchBuilder.AddStmtWithPrefix(t.Metadata().Name+"_new", stmt, names) + } + stmt, names := batchBuilder.ToCql() + return operation{stmt, names} + } +} + +func (da *dbAccessSingleEntity) getFiledsForUpdateBatch(columns []string) []string { + // primary columns (partition columns and sort columns) are not updatable. + primarykeys := make(map[string]struct{}) + for _, t := range da.tables { + for _, partKey := range t.Metadata().PartKey { + primarykeys[partKey] = struct{}{} + } + + for _, sortkey := range t.Metadata().SortKey { + primarykeys[sortkey] = struct{}{} + } + } + + var result []string + for _, col := range columns { + if _, exist := primarykeys[col]; !exist { + result = append(result, col) + } + } + return result +} + +func generateFindByPartKeyOperationName(tableName string) operationName { + return operationName("FindByPartKey_" + tableName) +} + +func generateFindByPartKeyAndSortKeyOperationName(tableName string, numSortCols int) operationName { + return operationName("FindByPartKeyAndSortKey_" + tableName + "_" + strconv.Itoa(numSortCols)) +} + +// entity should be a pointer. +// The returned interface{} is a pointer. +func (da *dbAccessSingleEntity) Read(entity interface{}) (interface{}, error) { + operation, ok := da.prebuiltOperations["Read"] + if !ok { + return nil, errors.New(fmt.Sprintf("The operation %s is not supported", "Read")) + } + stmt, names := operation.stmt, operation.names + q := gocqlx.Query(sample.Cql_Session.Query(stmt), names).BindStruct(entity) + cql := q.String() + + result := da.generateEntity() + if err := q.GetRelease(result); err != nil { + errMsg := fmt.Sprintf("Query: %s, \nError: %s", cql, err.Error()) + return nil, errors.New(errMsg) + } + return result, nil +} + +// entity should be a pointer. +// empty tbName means the base table name. +// The returned entities is an pointer to the entity array. +func (da *dbAccessSingleEntity) FindByPartKey(tbName string, entity interface{}) (interface{}, error) { + operation, ok := da.prebuiltOperations[generateFindByPartKeyOperationName(tbName)] + if !ok { + return nil, errors.New(fmt.Sprintf("The operation %s is not supported", generateFindByPartKeyOperationName(tbName))) + } + + stmt, names := operation.stmt, operation.names + q := gocqlx.Query(sample.Cql_Session.Query(stmt), names).BindStruct(entity) + cql := q.String() + + entities := da.generateEntities() + if err := q.SelectRelease(entities); err != nil { + errMsg := fmt.Sprintf("Query: %s, \nError: %s", cql, err.Error()) + return nil, errors.New(errMsg) + } + return entities, nil +} + +// entity should be a pointer. +// empty tbName means the base table name. +// The returned entities is an pointer to the entity array. +func (da *dbAccessSingleEntity) FindByPartKeyAndSortKey(tbName string, numSortCols int, entity interface{}) (interface{}, error) { + operation, ok := da.prebuiltOperations[generateFindByPartKeyAndSortKeyOperationName(tbName, numSortCols)] + if !ok { + return nil, errors.New(fmt.Sprintf("The operation %s is not supported", generateFindByPartKeyOperationName(tbName))) + } + + stmt, names := operation.stmt, operation.names + q := gocqlx.Query(sample.Cql_Session.Query(stmt), names).BindStruct(entity) + cql := q.String() + + entities := da.generateEntities() + if err := q.SelectRelease(entities); err != nil { + errMsg := fmt.Sprintf("Query: %s, \nError: %s", cql, err.Error()) + return nil, errors.New(errMsg) + } + return entities, nil +} + +// entityDel, entityCre should be pointers. +func (da *dbAccessSingleEntity) deleteAndCreate(entityDel, entityCre interface{}) error { + operation := da.generateDeleteAndCreateOperation() + stmt, names := operation.stmt, operation.names + + batch := da.generateBatch4CreateAndDelete(entityDel, entityCre) + q := gocqlx.Query(sample.Cql_Session.Query(stmt), names).BindStruct(batch) + cql := q.String() + + if err := q.ExecRelease(); err != nil { + errMsg := fmt.Sprintf("Query: %s, \nError: %s", cql, err.Error()) + return errors.New(errMsg) + } + return nil +} + +// entityCre should be pointers. +// The "find_by" columns are the partition keys of the "find_by" tables. So they cannot be updated directly. +// We need to find the existing old entity first, then delete the old and create the new one in a batch cql. +func (da *dbAccessSingleEntity) UpdateContainsSubTablePrimaryKey(entityCre interface{}) error { + entityDel, err := da.Read(entityCre) + if entityDel == nil { + entityCreBytes, _ := json.Marshal(entityCre) + msg := fmt.Sprintf("Failed to find the entity: %s. ", entityCreBytes) + if err != nil { + msg += "\n error: " + err.Error() + } + return errors.New(msg) + } + + err = da.deleteAndCreate(entityDel, entityCre) + return err +} + +func (da *dbAccessSingleEntity) SelectAll() (interface{}, error) { + stmt, names := qb.Select(da.tables[0].Name()).Columns(da.tables[0].Metadata().Columns...).ToCql() + q := gocqlx.Query(sample.Cql_Session.Query(stmt), names) + + entities := da.generateEntities() + if err := q.SelectRelease(entities); err != nil { + return nil, err + } + return entities, nil +} diff --git a/entitybatch/sample/multiEntities/operation.go b/entitybatch/sample/multiEntities/operation.go new file mode 100644 index 0000000..a7c6beb --- /dev/null +++ b/entitybatch/sample/multiEntities/operation.go @@ -0,0 +1,85 @@ +package multiEntities + +import ( + "github.com/scylladb/gocqlx/v2" + "github.com/scylladb/gocqlx/v2/entitybatch" + "github.com/scylladb/gocqlx/v2/entitybatch/sample" + "github.com/scylladb/gocqlx/v2/entitybatch/structures" + "github.com/scylladb/gocqlx/v2/qb" + "github.com/scylladb/gocqlx/v2/table" +) + +const ( + TABLE1 = "table1" + TABLE2 = "table2" +) + +type T_Table1 struct { + Id string `json:"id"` + Name string `json:"name"` +} + +type T_Table2 struct { + Id string `json:"id"` + Name string `json:"name"` +} + +type batchTables struct { + Table1 *T_Table1 + Table2 *T_Table2 +} + +var ( + DataAccess db.DbAccessBatchMultiEntities +) + +func init() { + var metadatas []table.Metadata + metadatas = append(metadatas, table.Metadata{ + Name: TABLE1, + Columns: structures.GetDbFieldNames(T_Table1{}), + PartKey: []string{"id"}, + SortKey: []string{}, + }) + + metadatas = append(metadatas, table.Metadata{ + Name: TABLE2, + Columns: structures.GetDbFieldNames(T_Table1{}), + PartKey: []string{"id"}, + SortKey: []string{}, + }) + + generateBatch := func(entities ...interface{}) interface{} { + table1, ok := entities[0].(*T_Table1) + if !ok { + panic("The type of the entity needs to be *T_Table1") + } + table2, ok := entities[1].(*T_Table2) + if !ok { + panic("The type of the entity needs to be *T_Table2") + } + return &batchTables{table1, table2} + } + DataAccess = db.CreateDbAccessMultipleEntities(metadatas, generateBatch) +} + +func SelectAll(tableIndex int) (interface{}, error) { + table := DataAccess.Tables()[tableIndex] + stmt, names := qb.Select(table.Name()).Columns(table.Metadata().Columns...).ToCql() + q := gocqlx.Query(sample.Cql_Session.Query(stmt), names) + + generateEntities := func() (entities interface{}) { + switch tableIndex { + case 0: + entities = &[]T_Table1{} + case 1: + entities = &[]T_Table2{} + } + return entities + } + entities := generateEntities() + if err := q.SelectRelease(entities); err != nil { + return nil, err + } + return entities, nil +} \ No newline at end of file diff --git a/entitybatch/sample/multiEntities/operation_test.go b/entitybatch/sample/multiEntities/operation_test.go new file mode 100644 index 0000000..a57be9a --- /dev/null +++ b/entitybatch/sample/multiEntities/operation_test.go @@ -0,0 +1,154 @@ +package multiEntities + +import ( + "github.com/scylladb/gocqlx/v2/entitybatch/sample" + "os" + "testing" +) + +var ( + dropTable1 = `drop table if exists table1;` + dropTable2 = `drop table if exists table2;` + + crateTable1 = ` +CREATE TABLE IF NOT EXISTS table1 ( + id text, + name text, + PRIMARY KEY (id) +); +` + crateTable2 = ` +CREATE TABLE IF NOT EXISTS table2 ( + id text, + name text, + PRIMARY KEY (id) +); +` +) + +func TestMain(m *testing.M) { + // load config files, keys, etc + if err := PrepareTestEnv(); err != nil { + panic(err.Error()) + } + + exit_val := m.Run() + + // truncate tested tables, etc + TeardownTestEnv() + + os.Exit(exit_val) +} + +func PrepareTestEnv() error { + if err := sample.Setup(); err != nil { + return err + } + + if err := dropTables(); err != nil { + return err + } + + if err := createTables(); err != nil { + return err + } + + return nil +} + +func dropTables() error { + if err := sample.Execute_Statement(dropTable1); err != nil { + return err + } + + if err := sample.Execute_Statement(dropTable2); err != nil { + return err + } + + return nil +} + +func createTables() error { + if err := sample.Execute_Statement(crateTable1); err != nil { + return err + } + + if err := sample.Execute_Statement(crateTable2); err != nil { + return err + } + + return nil +} + +func TeardownTestEnv() { + dropTables() +} + +func Test_CUD(t *testing.T) { + table1 := &T_Table1{"1", "name1"} + table2 := &T_Table2{"2", "name2"} + if err := DataAccess.Create(table1, table2); err != nil { + t.Fatal(err.Error()) + } + + table1.Name = "new name1" + table2.Name = "new name2" + + if err := DataAccess.Update(table1, table2); err != nil { + t.Fatal(err.Error()) + } + + { + // check update table1 + tmp, err := SelectAll(0) + if err != nil { + t.Fatal(err.Error()) + } + table1rows := tmp.(*[]T_Table1) + if table1rows == nil || len(*table1rows) != 1 { + t.Fatal("No table1 rows returned.") + } + if (*table1rows)[0].Name != "new name1" { + t.Errorf("Expected new table1.Name is new Name1, but got %s", (*table1rows)[0].Name) + } + + // check update table2 + tmp, err = SelectAll(1) + if err != nil { + t.Fatal(err.Error()) + } + table2rows := tmp.(*[]T_Table2) + if table2rows == nil || len(*table2rows) != 1 { + t.Fatal("No table1 rows returned.") + } + if (*table2rows)[0].Name != "new name2" { + t.Errorf("Expected new table2.Name is new Name2, but got %s", (*table2rows)[0].Name) + } + } + + if err := DataAccess.Delete(table1, table2); err != nil { + t.Fatal(err.Error()) + } + + { + // check delete table1 + tmp, err := SelectAll(0) + if err != nil { + t.Fatal(err.Error()) + } + table1rows := tmp.(*[]T_Table1) + if table1rows != nil && len(*table1rows) != 0 { + t.Fatal("table1 is not deleted.") + } + + // check update table2 + tmp, err = SelectAll(1) + if err != nil { + t.Fatal(err.Error()) + } + table2rows := tmp.(*[]T_Table2) + if table2rows != nil && len(*table2rows) != 0 { + t.Fatal("table2 is not deleted.") + } + } +} diff --git a/entitybatch/sample/session_factory.go b/entitybatch/sample/session_factory.go new file mode 100644 index 0000000..c90cf3c --- /dev/null +++ b/entitybatch/sample/session_factory.go @@ -0,0 +1,25 @@ +package sample + +import ( + "time" + + "github.com/gocql/gocql" +) + +var ( + cluster_config *gocql.ClusterConfig + Cql_Session *gocql.Session +) + +func NewSession() (err error) { + cluster_config = gocql.NewCluster("127.0.0.1:9042") + cluster_config.Keyspace = "kaicloud_dev" + cluster_config.Consistency = gocql.Quorum + cluster_config.ProtoVersion = 4 + cluster_config.ConnectTimeout = time.Second * 20 + cluster_config.Timeout = time.Second * 20 + + Cql_Session, err = cluster_config.CreateSession() + + return +} diff --git a/entitybatch/sample/singleEntity/operation.go b/entitybatch/sample/singleEntity/operation.go new file mode 100644 index 0000000..9962abc --- /dev/null +++ b/entitybatch/sample/singleEntity/operation.go @@ -0,0 +1,96 @@ +package singleEntity + +import ( + "github.com/scylladb/gocqlx/v2/entitybatch" + "github.com/scylladb/gocqlx/v2/entitybatch/structures" + "github.com/scylladb/gocqlx/v2/table" +) + +const ( + SINGLE_ENTITY = "single_entity" + SINGLE_ENTITY_BY_NAME = "single_entity_by_name" + SINGLE_ENTITY_BY_AGE = "single_entity_by_age" +) + +type T_SingleEntity struct { + Id string `json:"id"` + Name string `json:"name"` + Alias string `json:"alias"` + Age string `json:"age"` + Hobby string `json:"hobby"` +} + +type batchSingleEntity struct { + SingleEntity *T_SingleEntity + SingleEntity_by_name *T_SingleEntity + SingleEntity_by_age *T_SingleEntity +} + +type batchSingleEntity4DelAndCreate struct { + SingleEntity_old *T_SingleEntity + SingleEntity_by_name_old *T_SingleEntity + SingleEntity_by_age_old *T_SingleEntity + + SingleEntity_new *T_SingleEntity + SingleEntity_by_name_new *T_SingleEntity + SingleEntity_by_age_new *T_SingleEntity +} + +var ( + DataAccess db.DbAccessSingleEntity +) + +func init() { + var metadatas []table.Metadata + metadatas = append(metadatas, table.Metadata{ + Name: SINGLE_ENTITY, + Columns: structures.GetDbFieldNames(T_SingleEntity{}), + PartKey: []string{"id"}, + SortKey: []string{"name", "alias"}, + }) + + metadatas = append(metadatas, table.Metadata{ + Name: SINGLE_ENTITY_BY_NAME, + Columns: structures.GetDbFieldNames(T_SingleEntity{}), + PartKey: []string{"name"}, + SortKey: []string{"id", "alias"}, + }) + + metadatas = append(metadatas, table.Metadata{ + Name: SINGLE_ENTITY_BY_AGE, + Columns: structures.GetDbFieldNames(T_SingleEntity{}), + PartKey: []string{"age"}, + SortKey: []string{"id", "alias"}, + }) + + generateEntity := func() (entity interface{}) { + return &T_SingleEntity{} + } + generateEntities := func() (entities interface{}) { + return &[]T_SingleEntity{} + } + generateBatch := func(entities ...interface{}) interface{} { + singleEntity, ok := entities[0].(*T_SingleEntity) + if !ok { + panic("The type of the entities[0] needs to be *T_SingleEntity") + } + return &batchSingleEntity{singleEntity, singleEntity, singleEntity} + } + generateBatch4CreateAndDelete := func(entityDel, entityCre interface{}) (batch interface{}) { + producttypeDel, ok := entityDel.(*T_SingleEntity) + if !ok { + panic("The type of the entityDel needs to be *T_SingleEntity") + } + + producttypeCre, ok := entityCre.(*T_SingleEntity) + if !ok { + panic("The type of the entityCre needs to be *T_SingleEntity") + } + + return &batchSingleEntity4DelAndCreate{ + producttypeDel, producttypeDel, producttypeDel, + producttypeCre, producttypeCre, producttypeCre, + } + } + DataAccess = db.CreateDbAccessSingleEntity(metadatas, generateEntity, generateEntities, generateBatch, generateBatch4CreateAndDelete) +} diff --git a/entitybatch/sample/singleEntity/operation_test.go b/entitybatch/sample/singleEntity/operation_test.go new file mode 100644 index 0000000..ea30930 --- /dev/null +++ b/entitybatch/sample/singleEntity/operation_test.go @@ -0,0 +1,325 @@ +package singleEntity + +import ( + "encoding/json" + "github.com/scylladb/gocqlx/v2/entitybatch/sample" + "os" + "strings" + "testing" +) + +var ( + dropTable = `drop table if exists single_entity;` + crateTable = ` +CREATE TABLE single_entity ( + id text, + name text, + alias text, + age text, + hobby text, + PRIMARY KEY ((id), name, alias) +); +` + + dropTableByName = `drop table if exists single_entity_by_name;` + crateTableByName = ` +CREATE TABLE single_entity_by_name ( + id text, + name text, + alias text, + age text, + hobby text, + PRIMARY KEY ((name), id, alias) +); +` + dropTableByAge = `drop table if exists single_entity_by_age;` + crateTableByAge = ` +CREATE TABLE single_entity_by_age ( + id text, + name text, + alias text, + age text, + hobby text, + PRIMARY KEY ((age), id, alias) +); +` +) + +func TestMain(m *testing.M) { + // load config files, keys, etc + if err := PrepareTestEnv(); err != nil { + panic(err.Error()) + } + + exit_val := m.Run() + + // truncate tested tables, etc + TeardownTestEnv() + + os.Exit(exit_val) +} + +func PrepareTestEnv() error { + if err := sample.Setup(); err != nil { + return err + } + + if err := dropTables(); err != nil { + return err + } + + if err := createTables(); err != nil { + return err + } + + return nil +} + +func dropTables() error { + if err := sample.Execute_Statement(dropTable); err != nil { + return err + } + if err := sample.Execute_Statement(dropTableByName); err != nil { + return err + } + if err := sample.Execute_Statement(dropTableByAge); err != nil { + return err + } + + return nil +} + +func createTables() error { + if err := sample.Execute_Statement(crateTable); err != nil { + return err + } + if err := sample.Execute_Statement(crateTableByName); err != nil { + return err + } + if err := sample.Execute_Statement(crateTableByAge); err != nil { + return err + } + + return nil +} + +func TeardownTestEnv() { + dropTables() +} + +func Test_CRUD(t *testing.T) { + singleEntityCreate := T_SingleEntity{ + Id: "id1", + Name: "name1", + Alias: "alias1", + Age: "age1", + Hobby: "hobby1", + } + if err := DataAccess.Create(&singleEntityCreate); err != nil { + t.Fatal(err.Error()) + } + + singleEntityUpdate := singleEntityCreate + singleEntityUpdate.Hobby += "new" + err := DataAccess.Update(&singleEntityUpdate) + if err != nil { + t.Fatal(err.Error()) + } + singleEntity_update_json, _ := json.Marshal(singleEntityUpdate) + + tmp := T_SingleEntity{ + Id: "id1", + Name: "name1", + Alias: "alias1", + Age: "", + } + singleEntity_read, err := DataAccess.Read(&tmp) + if err != nil { + t.Fatal(err.Error()) + } + singleEntity_read_json, _ := json.Marshal(singleEntity_read) + + if string(singleEntity_update_json) != string(singleEntity_read_json) { + t.Fatal("singleEntity_read_json: ", string(singleEntity_read_json), "\n", "singleEntity_update_json: ", string(singleEntity_update_json)) + } + + if err := DataAccess.Delete(singleEntity_read); err != nil { + t.Fatal(err.Error()) + } + + tmp = T_SingleEntity{ + Id: "id1", + Name: "name1", + Alias: "alias1", + Age: "", + } + singleEntity_read_after_delete, err := DataAccess.Read(&tmp) + if err != nil { + if !strings.Contains(err.Error(), "not found") { + t.Fatal(err.Error()) + } + } + if singleEntity_read_after_delete != nil { + t.Fatal() + } +} + +/* The incorrect Update usage: update a field which is a partition key of a search/sub table. +For example, age is the partition key of single_entity_by_age, if you use Update to update it, the result is: +The existing single_entity row doesn't change. There is a new row created in single_entity_by_age table. +This is because, the filed age is filtered out when constructing the statement by the function below: +func (da *dbAccessBatch) getFiledsForUpdateBatch(t *table.Table) []string +The statement looks like below: + +BEGIN BATCH +UPDATE single_entity SET hobby=? WHERE id=? AND name=? AND alias=? ; +UPDATE single_entity_by_name SET hobby=? WHERE name=? AND id=? AND alias=? ; +UPDATE single_entity_by_age SET hobby=? WHERE age=? AND id=? AND alias=? ; +APPLY BATCH + +after the data is bound, it's +BEGIN BATCH +UPDATE single_entity SET hobby='hobby1' WHERE id='id1' AND name='name1' AND alias='alias1' ; +UPDATE single_entity_by_name SET hobby='hobby1' WHERE id='id1' AND name='name1' AND alias='alias1'; +UPDATE single_entity_by_age SET hobby='hobby1' WHERE age='newage1' AND id='id1' AND alias='alias1'; +APPLY BATCH + +So the first two cqls don't change anything, the 3rd cql (UPDATE single_entity_by_age SET ... WHERE age='newage1') creates a new row. +*/ +func Test_IncorrectUpdate(t *testing.T) { + singleEntityCreate := T_SingleEntity{ + Id: "id1", + Name: "name1", + Alias: "alias1", + Age: "age1", + Hobby: "hobby1", + } + if err := DataAccess.Create(&singleEntityCreate); err != nil { + t.Fatal(err.Error()) + } + + singleEntityUpdate := singleEntityCreate + singleEntityUpdate.Age += "new" + err := DataAccess.Update(&singleEntityUpdate) + if err != nil { + t.Fatal(err.Error()) + } + + tmp := T_SingleEntity{ + Id: "id1", + Name: "name1", + Alias: "alias1", + Age: "", + } + singleEntity_read, err := DataAccess.Read(&tmp) + if err != nil { + t.Fatal(err.Error()) + } + + if singleEntity_read.(*T_SingleEntity).Age != singleEntityCreate.Age { + t.Errorf("The Update should not be able to change the search table's partition key!") + } + + /* TODO: Add check for the unexpectedly created row. + This incorrect usage of Update creates another ne row in single_entity_by_age + age | id | alias | hobby | name + ---------+-----+--------+--------+------- + age1 | id1 | alias1 | hobby1 | name1 + age1new | id1 | alias1 | hobby1 | null + */ +} + +func Test_FindByPartKey(t *testing.T) { + singleEntityCreate := T_SingleEntity{ + Id: "id1", + Name: "name1", + Alias: "alias1", + Age: "age1", + } + if err := DataAccess.Create(&singleEntityCreate); err != nil { + t.Fatal(err.Error()) + } + + singleEntityCreate_read_json, _ := json.Marshal(singleEntityCreate) + + searchInput := T_SingleEntity{ + Name: "name1", + } + singleEntity_read, err := DataAccess.FindByPartKey(SINGLE_ENTITY_BY_NAME, &searchInput) + if err != nil { + t.Fatal(err.Error()) + } + singleEntity_read_json, _ := json.Marshal((*(singleEntity_read.(*[]T_SingleEntity)))[0]) + + if string(singleEntityCreate_read_json) != string(singleEntity_read_json) { + t.Fatal("singleEntity_read_json: ", string(singleEntity_read_json), "\n", "singleEntityCreate_read_json: ", string(singleEntityCreate_read_json)) + } +} + +func Test_FindByPartKeyAndSortKey(t *testing.T) { + singleEntityCreate := T_SingleEntity{ + Id: "id1", + Name: "name1", + Alias: "alias1", + Age: "age1", + } + if err := DataAccess.Create(&singleEntityCreate); err != nil { + t.Fatal(err.Error()) + } + + singleEntityCreate_read_json, _ := json.Marshal(singleEntityCreate) + + searchInput := T_SingleEntity{ + Name: "name1", + Id: "id1", + } + // We have 2 sort columns in all. As a demo, we use only one of them (the first one) + singleEntity_read, err := DataAccess.FindByPartKeyAndSortKey(SINGLE_ENTITY_BY_NAME, 1, &searchInput) + if err != nil { + t.Fatal(err.Error()) + } + singleEntity_read_json, _ := json.Marshal((*(singleEntity_read.(*[]T_SingleEntity)))[0]) + + if string(singleEntityCreate_read_json) != string(singleEntity_read_json) { + t.Fatal("singleEntity_read_json: ", string(singleEntity_read_json), "\n", "singleEntityCreate_read_json: ", string(singleEntityCreate_read_json)) + } +} + +// In order to update the partition key of the search/sub/find_by table, we need to use UpdateContainsSubTablePrimaryKey. +// It deletes the existing entity and then create the new one. +func Test_UpdateContainsSubTablePrimaryKey(t *testing.T) { + singleEntityCreate := T_SingleEntity{ + Id: "id1", + Name: "name1", + Alias: "alias1", + Age: "age1", + } + if err := DataAccess.Create(&singleEntityCreate); err != nil { + t.Fatal(err.Error()) + } + + singleEntityUpdate := singleEntityCreate + singleEntityUpdate.Age += "new" + err := DataAccess.UpdateContainsSubTablePrimaryKey(&singleEntityUpdate) + if err != nil { + t.Fatal(err.Error()) + } + singleEntity_update_json, _ := json.Marshal(singleEntityUpdate) + + searchInput := &T_SingleEntity{ + Name: "name1", + Id: "id1", + Alias: "alias1", + } + + // We have 2 sort columns in all. As a demo, we use only one of them (the first one) + singleEntity_read, err := DataAccess.Read(&searchInput) + if err != nil { + t.Fatal(err.Error()) + } + singleEntity_read_json, _ := json.Marshal(singleEntity_read.(*T_SingleEntity)) + + if string(singleEntity_update_json) != string(singleEntity_read_json) { + t.Fatal("singleEntity_read_json: ", string(singleEntity_read_json), "\n", "singleEntity_update_json: ", string(singleEntity_update_json)) + } +} \ No newline at end of file diff --git a/entitybatch/sample/test_setup.go b/entitybatch/sample/test_setup.go new file mode 100644 index 0000000..ba4ddbd --- /dev/null +++ b/entitybatch/sample/test_setup.go @@ -0,0 +1,24 @@ +// test_setup.go is intended to be used by all the tests inside data/db +// this way we can easily setup and teardown stuff inside a func TestMain(...) +// DO NOT USE THE EXPORTED FUNCIONS OUTSIDE OF A TEST +package sample + +import ( + "errors" +) + +func Setup() error { + NewSession() + return nil +} + +func Execute_Statement(stmt string) error { + + if len(stmt) == 0 { + return errors.New("statement is required") + } + + q := Cql_Session.Query(stmt) + defer q.Release() + return q.Exec() +} diff --git a/entitybatch/structures/db_fields.go b/entitybatch/structures/db_fields.go new file mode 100644 index 0000000..5222c01 --- /dev/null +++ b/entitybatch/structures/db_fields.go @@ -0,0 +1,45 @@ +package structures + +import ( + "reflect" + "strings" +) + +func GetDbFieldNames(input interface{}) (db_fields []string) { + + dest_value := reflect.ValueOf(input) + dest_type := reflect.TypeOf(input) + + for index := 0; index < dest_type.NumField(); index++ { + + if v := dest_value.Field(index); v.Kind() == reflect.Struct { + db_fields = append(db_fields, GetDbFieldNames(v.Interface())...) + continue + } + + dest_field := dest_type.Field(index) + db_tag := dest_field.Tag.Get("db") + if strings.Contains(db_tag, ",") { + db_tag = strings.Split(db_tag, ",")[0] + } + + if len(db_tag) > 0 { + db_fields = append(db_fields, db_tag) + continue + } + + json_tag := dest_field.Tag.Get("json") + if strings.Contains(json_tag, ",") { + json_tag = strings.Split(json_tag, ",")[0] + } + + if len(json_tag) > 0 { + db_fields = append(db_fields, json_tag) + continue + } + + db_fields = append(db_fields, strings.ToLower(dest_field.Name)) + } + + return +} \ No newline at end of file