217 lines
5.4 KiB
Go
217 lines
5.4 KiB
Go
package dbmeta
|
|
|
|
import (
|
|
"context"
|
|
|
|
"git.bit5.ru/backend/db"
|
|
"git.bit5.ru/backend/errors"
|
|
"git.bit5.ru/backend/meta"
|
|
|
|
"go.opentelemetry.io/otel/attribute"
|
|
"go.opentelemetry.io/otel/codes"
|
|
)
|
|
|
|
type DataCollection struct {
|
|
Db *db.DBC
|
|
OwnerId uint32
|
|
sqlFilter string
|
|
Items map[uint64]meta.IMetaDataItem
|
|
itemInstance meta.IMetaDataItem
|
|
ownerFieldIndex int
|
|
}
|
|
|
|
func (coll *DataCollection) Checkin(item meta.IMetaDataItem) (bool, error) {
|
|
if coll.itemInstance.CLASS_ID() != item.CLASS_ID() {
|
|
return false, errors.Errorf("Wrong class element %d for adding to collection %d", item.CLASS_ID(), coll.itemInstance.CLASS_ID())
|
|
}
|
|
|
|
id := item.GetIdValue()
|
|
//NOTE: it's considered to be OK
|
|
if id == 0 {
|
|
return false, nil
|
|
}
|
|
coll.Items[item.GetIdValue()] = item
|
|
return true, nil
|
|
}
|
|
|
|
func (coll *DataCollection) Save(ctx context.Context) error {
|
|
|
|
ctx, span := tracer.Start(ctx, "DataCollection.Save")
|
|
defer span.End()
|
|
|
|
fields := coll.GetDbFields()
|
|
countFields := len(fields)
|
|
if countFields == 0 {
|
|
err := errors.New("Fields list is empty")
|
|
span.RecordError(err)
|
|
span.SetStatus(codes.Error, err.Error())
|
|
return err
|
|
}
|
|
|
|
countRows := len(coll.Items)
|
|
if countRows == 0 {
|
|
return nil
|
|
}
|
|
|
|
allDataWithoutMask := true
|
|
for _, item := range coll.Items {
|
|
bitmaskItem, bitmaskable := item.(meta.IBitmasked)
|
|
if bitmaskable && bitmaskItem.IsMaskFilled() {
|
|
allDataWithoutMask = false
|
|
break
|
|
}
|
|
}
|
|
|
|
// NOTE: performance fix: if none of rows have mask we can save all data by one sql query
|
|
if allDataWithoutMask {
|
|
if err := coll.saveWithoutMask(ctx); err != nil {
|
|
span.RecordError(err)
|
|
span.SetStatus(codes.Error, err.Error())
|
|
return err
|
|
}
|
|
return nil
|
|
}
|
|
|
|
if err := coll.saveByMask(ctx); err != nil {
|
|
span.RecordError(err)
|
|
span.SetStatus(codes.Error, err.Error())
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (coll *DataCollection) saveByMask(ctx context.Context) error {
|
|
|
|
ctx, span := tracer.Start(ctx, "DataCollection.saveByMask")
|
|
defer span.End()
|
|
|
|
fields := coll.GetDbFields()
|
|
countFields := len(fields)
|
|
|
|
tableName := coll.GetTableName()
|
|
|
|
span.SetAttributes(
|
|
attribute.String("table", tableName),
|
|
attribute.Int("field_amount", countFields),
|
|
)
|
|
|
|
for _, item := range coll.Items {
|
|
data := make([]interface{}, countFields)
|
|
item.Export(data)
|
|
|
|
// enforcing ownerId
|
|
data[coll.ownerFieldIndex] = coll.OwnerId
|
|
|
|
itemFields := make([]string, 0, countFields)
|
|
itemData := make([]interface{}, 0, countFields)
|
|
|
|
bitmaskItem, bitmaskable := item.(meta.IBitmasked)
|
|
|
|
for fieldInd, fieldName := range fields {
|
|
skipAdding := bitmaskable && bitmaskItem.IsMaskFilled() && !bitmaskItem.HasValue(uint64(fieldInd))
|
|
if skipAdding {
|
|
continue
|
|
}
|
|
|
|
itemFields = append(itemFields, fieldName)
|
|
itemData = append(itemData, data[fieldInd])
|
|
}
|
|
|
|
sqlSmt := createInsertSQLForFields(ctx, tableName, itemFields, 1)
|
|
_, err := coll.Db.UpdateBySQL(sqlSmt, itemData...).Exec()
|
|
if err != nil {
|
|
log_len := len(sqlSmt)
|
|
if log_len > 200 {
|
|
log_len = 200
|
|
}
|
|
|
|
resultErr := errors.Errorf("%s (%s)", err.Error(), sqlSmt[0:log_len])
|
|
span.RecordError(resultErr)
|
|
span.SetStatus(codes.Error, resultErr.Error())
|
|
return resultErr
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (coll *DataCollection) saveWithoutMask(ctx context.Context) error {
|
|
|
|
ctx, span := tracer.Start(ctx, "DataCollection.saveWithoutMask")
|
|
defer span.End()
|
|
|
|
tableName := coll.GetTableName()
|
|
|
|
fields := coll.GetDbFields()
|
|
countFields := len(fields)
|
|
countRows := len(coll.Items)
|
|
|
|
span.SetAttributes(
|
|
attribute.String("table", tableName),
|
|
attribute.Int("field_amount", countFields),
|
|
attribute.Int("item_amount", countRows),
|
|
)
|
|
|
|
offset := 0
|
|
rows := make([]interface{}, countRows*countFields)
|
|
for _, item := range coll.Items {
|
|
start := offset * countFields
|
|
item.Export(rows[start:(start + countFields)])
|
|
|
|
ownerValue := rows[start+coll.ownerFieldIndex]
|
|
if ownerValue == uint32(0) {
|
|
rows[start+coll.ownerFieldIndex] = coll.OwnerId
|
|
} else if ownerValue != coll.OwnerId {
|
|
err := errors.Errorf("Wrong owner_id in %s value %d (!= %d)", tableName, ownerValue, coll.OwnerId)
|
|
span.RecordError(err)
|
|
span.SetStatus(codes.Error, err.Error())
|
|
return err
|
|
}
|
|
offset++
|
|
}
|
|
|
|
sqlSmt := createInsertSQLForFields(ctx, tableName, fields, countRows)
|
|
_, err := coll.Db.UpdateBySQL(sqlSmt, rows...).Exec()
|
|
if err != nil {
|
|
resultErr := errors.Errorf("error: %s, sql: %s", err.Error(), sqlSmt)
|
|
span.RecordError(resultErr)
|
|
span.SetStatus(codes.Error, resultErr.Error())
|
|
return resultErr
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (coll *DataCollection) GetDbFields() []string {
|
|
return coll.itemInstance.GetDbFields()
|
|
}
|
|
|
|
func (coll *DataCollection) GetTableName() string {
|
|
return coll.itemInstance.GetDbTableName()
|
|
}
|
|
|
|
func (coll *DataCollection) init() error {
|
|
coll.Items = make(map[uint64]meta.IMetaDataItem)
|
|
|
|
coll.ownerFieldIndex = -1
|
|
ownerField := coll.itemInstance.GetOwnerFieldName()
|
|
for index, field := range coll.itemInstance.GetDbFields() {
|
|
if field == ownerField {
|
|
coll.ownerFieldIndex = index
|
|
break
|
|
}
|
|
}
|
|
if coll.ownerFieldIndex == -1 {
|
|
return errors.New("Owner field not found in fields list")
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func NewDataCollection(db *db.DBC, ownerId uint32, sqlFilter string, itemInstance meta.IMetaDataItem) (*DataCollection, error) {
|
|
data := DataCollection{Db: db, OwnerId: ownerId, sqlFilter: sqlFilter, itemInstance: itemInstance}
|
|
if err := data.init(); err != nil {
|
|
return nil, err
|
|
}
|
|
return &data, nil
|
|
}
|