@@ -714,23 +714,17 @@ func (ex *connExecutor) execStmtInParallel(
714714
715715 planner .statsCollector .PhaseTimes ()[plannerStartExecStmt ] = timeutil .Now ()
716716
717- shouldUseDistSQL := shouldUseDistSQL (distributePlan , ex .sessionData .DistSQLMode )
718717 samplePlanDescription := ex .sampleLogicalPlanDescription (
719- stmt , shouldUseDistSQL , false /* optimizerUsed */ , planner )
718+ stmt , false /* optimizerUsed */ , planner )
720719
721720 var flags planFlags
722- if shouldUseDistSQL {
723- if distributePlan {
724- flags .Set (planFlagDistributed )
725- } else {
726- flags .Set (planFlagDistSQLLocal )
727- }
728- ex .sessionTracing .TraceExecStart (ctx , "distributed-parallel" )
729- err = ex .execWithDistSQLEngine (ctx , planner , stmt .AST .StatementType (), res , distributePlan )
721+ if distributePlan {
722+ flags .Set (planFlagDistributed )
730723 } else {
731- ex .sessionTracing .TraceExecStart (ctx , "local-parallel" )
732- err = ex .execWithLocalEngine (ctx , planner , stmt .AST .StatementType (), res )
724+ flags .Set (planFlagDistSQLLocal )
733725 }
726+ ex .sessionTracing .TraceExecStart (ctx , "parallel" )
727+ err = ex .execWithDistSQLEngine (ctx , planner , stmt .AST .StatementType (), res , distributePlan )
734728 ex .sessionTracing .TraceExecEnd (ctx , res .Err (), res .RowsAffected ())
735729 planner .statsCollector .PhaseTimes ()[plannerEndExecStmt ] = timeutil .Now ()
736730
@@ -851,21 +845,15 @@ func (ex *connExecutor) dispatchToExecutionEngine(
851845 queryMeta .isDistributed = distributePlan
852846 ex .mu .Unlock ()
853847
854- shouldUseDistSQL := shouldUseDistSQL (distributePlan , ex .sessionData .DistSQLMode )
855- samplePlanDescription := ex .sampleLogicalPlanDescription (stmt , shouldUseDistSQL , flags .IsSet (planFlagOptUsed ), planner )
848+ samplePlanDescription := ex .sampleLogicalPlanDescription (stmt , flags .IsSet (planFlagOptUsed ), planner )
856849
857- if shouldUseDistSQL {
858- if distributePlan {
859- flags .Set (planFlagDistributed )
860- } else {
861- flags .Set (planFlagDistSQLLocal )
862- }
863- ex .sessionTracing .TraceExecStart (ctx , "distributed" )
864- err = ex .execWithDistSQLEngine (ctx , planner , stmt .AST .StatementType (), res , distributePlan )
850+ if distributePlan {
851+ flags .Set (planFlagDistributed )
865852 } else {
866- ex .sessionTracing .TraceExecStart (ctx , "local" )
867- err = ex .execWithLocalEngine (ctx , planner , stmt .AST .StatementType (), res )
853+ flags .Set (planFlagDistSQLLocal )
868854 }
855+ ex .sessionTracing .TraceExecStart (ctx , "distributed" )
856+ err = ex .execWithDistSQLEngine (ctx , planner , stmt .AST .StatementType (), res , distributePlan )
869857 ex .sessionTracing .TraceExecEnd (ctx , res .Err (), res .RowsAffected ())
870858 planner .statsCollector .PhaseTimes ()[plannerEndExecStmt ] = timeutil .Now ()
871859 if err != nil {
@@ -920,13 +908,13 @@ func (ex *connExecutor) makeExecPlan(
920908// sampleLogicalPlanDescription returns a serialized representation of a statement's logical plan.
921909// The returned ExplainTreePlanNode will be nil if plan should not be sampled.
922910func (ex * connExecutor ) sampleLogicalPlanDescription (
923- stmt Statement , useDistSQL bool , optimizerUsed bool , planner * planner ,
911+ stmt Statement , optimizerUsed bool , planner * planner ,
924912) * roachpb.ExplainTreePlanNode {
925913 if ! sampleLogicalPlans .Get (& ex .appStats .st .SV ) {
926914 return nil
927915 }
928916
929- if ex .saveLogicalPlanDescription (stmt , useDistSQL , optimizerUsed ) {
917+ if ex .saveLogicalPlanDescription (stmt , optimizerUsed ) {
930918 return planToTree (context .Background (), planner .curPlan )
931919 }
932920 return nil
@@ -935,10 +923,8 @@ func (ex *connExecutor) sampleLogicalPlanDescription(
935923// saveLogicalPlanDescription returns if we should save this as a sample logical plan
936924// for its corresponding fingerprint. We use `saveFingerprintPlanOnceEvery`
937925// to assess how frequently to sample logical plans.
938- func (ex * connExecutor ) saveLogicalPlanDescription (
939- stmt Statement , useDistSQL bool , optimizerUsed bool ,
940- ) bool {
941- stats := ex .appStats .getStatsForStmt (stmt , useDistSQL , optimizerUsed , nil , false /* createIfNonexistent */ )
926+ func (ex * connExecutor ) saveLogicalPlanDescription (stmt Statement , optimizerUsed bool ) bool {
927+ stats := ex .appStats .getStatsForStmt (stmt , true /* distSQLUsed */ , optimizerUsed , nil , false /* createIfNonexistent */ )
942928 if stats == nil {
943929 // Save logical plan the first time we see new statement fingerprint.
944930 return true
@@ -977,68 +963,6 @@ func canFallbackFromOpt(err error, optMode sessiondata.OptimizerMode, stmt State
977963 return true
978964}
979965
980- // execWithLocalEngine runs a plan using the local (non-distributed) SQL
981- // engine.
982- // If an error is returned, the connection needs to stop processing queries.
983- // Such errors are also written to res.
984- // Query execution errors are written to res; they are not returned.
985- func (ex * connExecutor ) execWithLocalEngine (
986- ctx context.Context , planner * planner , stmtType tree.StatementType , res RestrictedCommandResult ,
987- ) error {
988- // Create a BoundAccount to track the memory usage of each row.
989- rowAcc := planner .extendedEvalCtx .Mon .MakeBoundAccount ()
990- planner .extendedEvalCtx .ActiveMemAcc = & rowAcc
991- defer rowAcc .Close (ctx )
992-
993- params := runParams {
994- ctx : ctx ,
995- extendedEvalCtx : & planner .extendedEvalCtx ,
996- p : planner ,
997- }
998-
999- if err := planner .curPlan .start (params ); err != nil {
1000- res .SetError (err )
1001- return nil
1002- }
1003-
1004- switch stmtType {
1005- case tree .RowsAffected :
1006- count , err := countRowsAffected (params , planner .curPlan .plan )
1007- if err != nil {
1008- res .SetError (err )
1009- return nil
1010- }
1011- res .IncrementRowsAffected (count )
1012- return nil
1013- case tree .Rows :
1014- consumeCtx , cleanup := ex .sessionTracing .TraceExecConsume (ctx )
1015- defer cleanup ()
1016-
1017- var commErr error
1018- queryErr := ex .forEachRow (params , planner .curPlan .plan , func (values tree.Datums ) error {
1019- for _ , val := range values {
1020- if err := checkResultType (val .ResolvedType ()); err != nil {
1021- return err
1022- }
1023- }
1024- ex .sessionTracing .TraceExecRowsResult (consumeCtx , values )
1025- commErr = res .AddRow (consumeCtx , values )
1026- return commErr
1027- })
1028- if commErr != nil {
1029- res .SetError (commErr )
1030- return commErr
1031- }
1032- if queryErr != nil {
1033- res .SetError (queryErr )
1034- }
1035- return nil
1036- default :
1037- // Calling StartPlan is sufficient for other statement types.
1038- return nil
1039- }
1040- }
1041-
1042966// execWithDistSQLEngine converts a plan to a distributed SQL physical plan and
1043967// runs it.
1044968// If an error is returned, the connection needs to stop processing queries.
@@ -1090,31 +1014,6 @@ func (ex *connExecutor) execWithDistSQLEngine(
10901014 return recv .commErr
10911015}
10921016
1093- // forEachRow calls the provided closure for each successful call to
1094- // planNode.Next with planNode.Values, making sure to properly track memory
1095- // usage.
1096- //
1097- // f is not allowed to hold on to the row slice. It needs to make a copy if it
1098- // want to use the memory later.
1099- //
1100- // Errors returned by this method are to be considered query errors. If the
1101- // caller wants to handle some errors within the callback differently, it has to
1102- // capture those itself.
1103- func (ex * connExecutor ) forEachRow (params runParams , p planNode , f func (tree.Datums ) error ) error {
1104- next , err := p .Next (params )
1105- for ; next ; next , err = p .Next (params ) {
1106- // If we're tracking memory, clear the previous row's memory account.
1107- if params .extendedEvalCtx .ActiveMemAcc != nil {
1108- params .extendedEvalCtx .ActiveMemAcc .Clear (params .ctx )
1109- }
1110-
1111- if err := f (p .Values ()); err != nil {
1112- return err
1113- }
1114- }
1115- return err
1116- }
1117-
11181017// execStmtInNoTxnState "executes" a statement when no transaction is in scope.
11191018// For anything but BEGIN, this method doesn't actually execute the statement;
11201019// it just returns an Event that will generate a transaction. The statement will
0 commit comments