Enable subqueries in CREATE / UPDATE / RELATE / DELETE queries
This commit is contained in:
parent
c0f9ec1f26
commit
2932bac73e
5 changed files with 98 additions and 13 deletions
10
db/create.go
10
db/create.go
|
@ -66,6 +66,16 @@ func (e *executor) executeCreate(ctx context.Context, stm *sql.CreateStatement)
|
||||||
key := &keys.Thing{KV: stm.KV, NS: stm.NS, DB: stm.DB, TB: what.TB, ID: nil}
|
key := &keys.Thing{KV: stm.KV, NS: stm.NS, DB: stm.DB, TB: what.TB, ID: nil}
|
||||||
i.processBatch(ctx, key, what)
|
i.processBatch(ctx, key, what)
|
||||||
|
|
||||||
|
// Result of subquery
|
||||||
|
case []interface{}:
|
||||||
|
key := &keys.Thing{KV: stm.KV, NS: stm.NS, DB: stm.DB}
|
||||||
|
i.processOther(ctx, key, what)
|
||||||
|
|
||||||
|
// Result of subquery with LIMIT 1
|
||||||
|
case map[string]interface{}:
|
||||||
|
key := &keys.Thing{KV: stm.KV, NS: stm.NS, DB: stm.DB}
|
||||||
|
i.processOther(ctx, key, []interface{}{what})
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
10
db/delete.go
10
db/delete.go
|
@ -65,6 +65,16 @@ func (e *executor) executeDelete(ctx context.Context, stm *sql.DeleteStatement)
|
||||||
key := &keys.Thing{KV: stm.KV, NS: stm.NS, DB: stm.DB, TB: what.TB, ID: nil}
|
key := &keys.Thing{KV: stm.KV, NS: stm.NS, DB: stm.DB, TB: what.TB, ID: nil}
|
||||||
i.processBatch(ctx, key, what)
|
i.processBatch(ctx, key, what)
|
||||||
|
|
||||||
|
// Result of subquery
|
||||||
|
case []interface{}:
|
||||||
|
key := &keys.Thing{KV: stm.KV, NS: stm.NS, DB: stm.DB}
|
||||||
|
i.processOther(ctx, key, what)
|
||||||
|
|
||||||
|
// Result of subquery with LIMIT 1
|
||||||
|
case map[string]interface{}:
|
||||||
|
key := &keys.Thing{KV: stm.KV, NS: stm.NS, DB: stm.DB}
|
||||||
|
i.processOther(ctx, key, []interface{}{what})
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -693,13 +693,13 @@ func (i *iterator) processModel(ctx context.Context, key *keys.Thing, qry *sql.M
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (i *iterator) processQuery(ctx context.Context, key *keys.Thing, val []interface{}) {
|
func (i *iterator) processOther(ctx context.Context, key *keys.Thing, val []interface{}) {
|
||||||
|
|
||||||
i.processPerms(ctx, key.NS, key.DB, key.TB)
|
i.processPerms(ctx, key.NS, key.DB, key.TB)
|
||||||
|
|
||||||
for _, val := range val {
|
for _, v := range val {
|
||||||
|
|
||||||
switch val := val.(type) {
|
switch v := v.(type) {
|
||||||
|
|
||||||
case *sql.Thing:
|
case *sql.Thing:
|
||||||
|
|
||||||
|
@ -710,7 +710,52 @@ func (i *iterator) processQuery(ctx context.Context, key *keys.Thing, val []inte
|
||||||
|
|
||||||
if i.checkState(ctx) {
|
if i.checkState(ctx) {
|
||||||
key := key.Copy()
|
key := key.Copy()
|
||||||
key.TB, key.ID = val.TB, val.ID
|
key.TB, key.ID = v.TB, v.ID
|
||||||
|
i.submitTask(key, nil, nil)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
default:
|
||||||
|
|
||||||
|
switch i.stm.(type) {
|
||||||
|
case *sql.CreateStatement:
|
||||||
|
i.fail <- fmt.Errorf("Can not execute CREATE query using value '%v'", val)
|
||||||
|
case *sql.UpdateStatement:
|
||||||
|
i.fail <- fmt.Errorf("Can not execute UPDATE query using value '%v'", val)
|
||||||
|
case *sql.DeleteStatement:
|
||||||
|
i.fail <- fmt.Errorf("Can not execute DELETE query using value '%v'", val)
|
||||||
|
case *sql.RelateStatement:
|
||||||
|
i.fail <- fmt.Errorf("Can not execute RELATE query using value '%v'", val)
|
||||||
|
}
|
||||||
|
|
||||||
|
close(i.stop)
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
break
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
func (i *iterator) processQuery(ctx context.Context, key *keys.Thing, val []interface{}) {
|
||||||
|
|
||||||
|
i.processPerms(ctx, key.NS, key.DB, key.TB)
|
||||||
|
|
||||||
|
for _, v := range val {
|
||||||
|
|
||||||
|
switch v := v.(type) {
|
||||||
|
|
||||||
|
case *sql.Thing:
|
||||||
|
|
||||||
|
// If the item is a *sql.Thing then
|
||||||
|
// this was a subquery which projected
|
||||||
|
// the ID only, and we can query the
|
||||||
|
// record further after loading it.
|
||||||
|
|
||||||
|
if i.checkState(ctx) {
|
||||||
|
key := key.Copy()
|
||||||
|
key.TB, key.ID = v.TB, v.ID
|
||||||
i.submitTask(key, nil, nil)
|
i.submitTask(key, nil, nil)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
@ -721,7 +766,7 @@ func (i *iterator) processQuery(ctx context.Context, key *keys.Thing, val []inte
|
||||||
// of the data so we can process it.
|
// of the data so we can process it.
|
||||||
|
|
||||||
if i.checkState(ctx) {
|
if i.checkState(ctx) {
|
||||||
i.submitTask(nil, nil, data.Consume(val))
|
i.submitTask(nil, nil, data.Consume(v))
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -737,9 +782,9 @@ func (i *iterator) processArray(ctx context.Context, key *keys.Thing, val []inte
|
||||||
|
|
||||||
i.processPerms(ctx, key.NS, key.DB, key.TB)
|
i.processPerms(ctx, key.NS, key.DB, key.TB)
|
||||||
|
|
||||||
for _, val := range val {
|
for _, v := range val {
|
||||||
|
|
||||||
switch val := val.(type) {
|
switch v := v.(type) {
|
||||||
|
|
||||||
case *sql.Thing:
|
case *sql.Thing:
|
||||||
|
|
||||||
|
@ -748,7 +793,7 @@ func (i *iterator) processArray(ctx context.Context, key *keys.Thing, val []inte
|
||||||
|
|
||||||
if i.checkState(ctx) {
|
if i.checkState(ctx) {
|
||||||
key := key.Copy()
|
key := key.Copy()
|
||||||
key.ID = val.ID
|
key.ID = v.ID
|
||||||
i.submitTask(key, nil, nil)
|
i.submitTask(key, nil, nil)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
@ -758,9 +803,9 @@ func (i *iterator) processArray(ctx context.Context, key *keys.Thing, val []inte
|
||||||
// If the data item has an ID field,
|
// If the data item has an ID field,
|
||||||
// then use this as the new record ID.
|
// then use this as the new record ID.
|
||||||
|
|
||||||
if fld, ok := val["id"]; ok {
|
if fld, ok := v["id"]; ok {
|
||||||
|
|
||||||
if thg, ok := val["id"].(*sql.Thing); ok {
|
if thg, ok := v["id"].(*sql.Thing); ok {
|
||||||
|
|
||||||
// If the ID is a *sql.Thing then this
|
// If the ID is a *sql.Thing then this
|
||||||
// was a subquery, so use the ID.
|
// was a subquery, so use the ID.
|
||||||
|
@ -768,7 +813,7 @@ func (i *iterator) processArray(ctx context.Context, key *keys.Thing, val []inte
|
||||||
if i.checkState(ctx) {
|
if i.checkState(ctx) {
|
||||||
key := key.Copy()
|
key := key.Copy()
|
||||||
key.ID = thg.ID
|
key.ID = thg.ID
|
||||||
i.submitTask(key, nil, data.Consume(val))
|
i.submitTask(key, nil, data.Consume(v))
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -780,7 +825,7 @@ func (i *iterator) processArray(ctx context.Context, key *keys.Thing, val []inte
|
||||||
if i.checkState(ctx) {
|
if i.checkState(ctx) {
|
||||||
key := key.Copy()
|
key := key.Copy()
|
||||||
key.ID = fld
|
key.ID = fld
|
||||||
i.submitTask(key, nil, data.Consume(val))
|
i.submitTask(key, nil, data.Consume(v))
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -794,7 +839,7 @@ func (i *iterator) processArray(ctx context.Context, key *keys.Thing, val []inte
|
||||||
if i.checkState(ctx) {
|
if i.checkState(ctx) {
|
||||||
key := key.Copy()
|
key := key.Copy()
|
||||||
key.ID = guid.New().String()
|
key.ID = guid.New().String()
|
||||||
i.submitTask(key, nil, data.Consume(val))
|
i.submitTask(key, nil, data.Consume(v))
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
10
db/relate.go
10
db/relate.go
|
@ -60,6 +60,16 @@ func (e *executor) executeRelate(ctx context.Context, stm *sql.RelateStatement)
|
||||||
key := &keys.Thing{KV: stm.KV, NS: stm.NS, DB: stm.DB, TB: what.ID, ID: guid.New().String()}
|
key := &keys.Thing{KV: stm.KV, NS: stm.NS, DB: stm.DB, TB: what.ID, ID: guid.New().String()}
|
||||||
i.processThing(ctx, key)
|
i.processThing(ctx, key)
|
||||||
|
|
||||||
|
// Result of subquery
|
||||||
|
case []interface{}:
|
||||||
|
key := &keys.Thing{KV: stm.KV, NS: stm.NS, DB: stm.DB}
|
||||||
|
i.processOther(ctx, key, what)
|
||||||
|
|
||||||
|
// Result of subquery with LIMIT 1
|
||||||
|
case map[string]interface{}:
|
||||||
|
key := &keys.Thing{KV: stm.KV, NS: stm.NS, DB: stm.DB}
|
||||||
|
i.processOther(ctx, key, []interface{}{what})
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return i.Yield(ctx)
|
return i.Yield(ctx)
|
||||||
|
|
10
db/update.go
10
db/update.go
|
@ -65,6 +65,16 @@ func (e *executor) executeUpdate(ctx context.Context, stm *sql.UpdateStatement)
|
||||||
key := &keys.Thing{KV: stm.KV, NS: stm.NS, DB: stm.DB, TB: what.TB, ID: nil}
|
key := &keys.Thing{KV: stm.KV, NS: stm.NS, DB: stm.DB, TB: what.TB, ID: nil}
|
||||||
i.processBatch(ctx, key, what)
|
i.processBatch(ctx, key, what)
|
||||||
|
|
||||||
|
// Result of subquery
|
||||||
|
case []interface{}:
|
||||||
|
key := &keys.Thing{KV: stm.KV, NS: stm.NS, DB: stm.DB}
|
||||||
|
i.processOther(ctx, key, what)
|
||||||
|
|
||||||
|
// Result of subquery with LIMIT 1
|
||||||
|
case map[string]interface{}:
|
||||||
|
key := &keys.Thing{KV: stm.KV, NS: stm.NS, DB: stm.DB}
|
||||||
|
i.processOther(ctx, key, []interface{}{what})
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue