dbmeta/collection.go

217 lines
5.4 KiB
Go

package dbmeta
import (
"context"
"git.bit5.ru/backend/db"
"git.bit5.ru/backend/errors"
"git.bit5.ru/backend/meta"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
)
type DataCollection struct {
Db *db.DBC
OwnerId uint32
sqlFilter string
Items map[uint64]meta.IMetaDataItem
itemInstance meta.IMetaDataItem
ownerFieldIndex int
}
func (coll *DataCollection) Checkin(item meta.IMetaDataItem) (bool, error) {
if coll.itemInstance.CLASS_ID() != item.CLASS_ID() {
return false, errors.Errorf("Wrong class element %d for adding to collection %d", item.CLASS_ID(), coll.itemInstance.CLASS_ID())
}
id := item.GetIdValue()
//NOTE: it's considered to be OK
if id == 0 {
return false, nil
}
coll.Items[item.GetIdValue()] = item
return true, nil
}
func (coll *DataCollection) Save(ctx context.Context) error {
ctx, span := tracer.Start(ctx, "DataCollection.Save")
defer span.End()
fields := coll.GetDbFields()
countFields := len(fields)
if countFields == 0 {
err := errors.New("Fields list is empty")
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return err
}
countRows := len(coll.Items)
if countRows == 0 {
return nil
}
allDataWithoutMask := true
for _, item := range coll.Items {
bitmaskItem, bitmaskable := item.(meta.IBitmasked)
if bitmaskable && bitmaskItem.IsMaskFilled() {
allDataWithoutMask = false
break
}
}
// NOTE: performance fix: if none of rows have mask we can save all data by one sql query
if allDataWithoutMask {
if err := coll.saveWithoutMask(ctx); err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return err
}
return nil
}
if err := coll.saveByMask(ctx); err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return err
}
return nil
}
func (coll *DataCollection) saveByMask(ctx context.Context) error {
ctx, span := tracer.Start(ctx, "DataCollection.saveByMask")
defer span.End()
fields := coll.GetDbFields()
countFields := len(fields)
tableName := coll.GetTableName()
span.SetAttributes(
attribute.String("table", tableName),
attribute.Int("field_amount", countFields),
)
for _, item := range coll.Items {
data := make([]interface{}, countFields)
item.Export(data)
// enforcing ownerId
data[coll.ownerFieldIndex] = coll.OwnerId
itemFields := make([]string, 0, countFields)
itemData := make([]interface{}, 0, countFields)
bitmaskItem, bitmaskable := item.(meta.IBitmasked)
for fieldInd, fieldName := range fields {
skipAdding := bitmaskable && bitmaskItem.IsMaskFilled() && !bitmaskItem.HasValue(uint64(fieldInd))
if skipAdding {
continue
}
itemFields = append(itemFields, fieldName)
itemData = append(itemData, data[fieldInd])
}
sqlSmt := createInsertSQLForFields(ctx, tableName, itemFields, 1)
_, err := coll.Db.UpdateBySQL(sqlSmt, itemData...).Exec()
if err != nil {
log_len := len(sqlSmt)
if log_len > 200 {
log_len = 200
}
resultErr := errors.Errorf("%s (%s)", err.Error(), sqlSmt[0:log_len])
span.RecordError(resultErr)
span.SetStatus(codes.Error, resultErr.Error())
return resultErr
}
}
return nil
}
func (coll *DataCollection) saveWithoutMask(ctx context.Context) error {
ctx, span := tracer.Start(ctx, "DataCollection.saveWithoutMask")
defer span.End()
tableName := coll.GetTableName()
fields := coll.GetDbFields()
countFields := len(fields)
countRows := len(coll.Items)
span.SetAttributes(
attribute.String("table", tableName),
attribute.Int("field_amount", countFields),
attribute.Int("item_amount", countRows),
)
offset := 0
rows := make([]interface{}, countRows*countFields)
for _, item := range coll.Items {
start := offset * countFields
item.Export(rows[start:(start + countFields)])
ownerValue := rows[start+coll.ownerFieldIndex]
if ownerValue == uint32(0) {
rows[start+coll.ownerFieldIndex] = coll.OwnerId
} else if ownerValue != coll.OwnerId {
err := errors.Errorf("Wrong owner_id in %s value %d (!= %d)", tableName, ownerValue, coll.OwnerId)
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return err
}
offset++
}
sqlSmt := createInsertSQLForFields(ctx, tableName, fields, countRows)
_, err := coll.Db.UpdateBySQL(sqlSmt, rows...).Exec()
if err != nil {
resultErr := errors.Errorf("error: %s, sql: %s", err.Error(), sqlSmt)
span.RecordError(resultErr)
span.SetStatus(codes.Error, resultErr.Error())
return resultErr
}
return nil
}
func (coll *DataCollection) GetDbFields() []string {
return coll.itemInstance.GetDbFields()
}
func (coll *DataCollection) GetTableName() string {
return coll.itemInstance.GetDbTableName()
}
func (coll *DataCollection) init() error {
coll.Items = make(map[uint64]meta.IMetaDataItem)
coll.ownerFieldIndex = -1
ownerField := coll.itemInstance.GetOwnerFieldName()
for index, field := range coll.itemInstance.GetDbFields() {
if field == ownerField {
coll.ownerFieldIndex = index
break
}
}
if coll.ownerFieldIndex == -1 {
return errors.New("Owner field not found in fields list")
}
return nil
}
func NewDataCollection(db *db.DBC, ownerId uint32, sqlFilter string, itemInstance meta.IMetaDataItem) (*DataCollection, error) {
data := DataCollection{Db: db, OwnerId: ownerId, sqlFilter: sqlFilter, itemInstance: itemInstance}
if err := data.init(); err != nil {
return nil, err
}
return &data, nil
}