Commit ea432032 authored by Andrew Werner's avatar Andrew Werner

sql,sqlbase,client: bootstrap TableDescriptor timestamps from MVCC

Using the MVCC timestamp of the value for table descriptors has long been
theorized as the right mechanism to eliminate the need for transactions
which update a table descriptor version to observe their commit timestamp
(see https://github.com/cockroachdb/cockroach/issues/17698#issuecomment-452035873).

The challenge was presumed to be the need to expose MVCC timestamps in our
client library. It turns out we seem to do that already (how did nobody know
that?!).

This commit avoids using the CommitTimestamp by using the MVCC timestamp on
the read path. In order to make this setting of the timestamp less of a footgun
we add a `(*Descriptor).Table(hlc.Timestamp)` method which forces anybody who
extracts a `TableDescriptor` from a `Descriptor` to pass in a timestamp which
may be used to set `ModificationTime` and `CreateAsOfTime`. A linter in the
following commit enforces this proper usage.

The below SQL would always fail before this change and now passes:

```
CREATE TABLE foo ( k INT PRIMARY KEY );
BEGIN;
DROP TABLE foo;
<wait a while>
COMMIT;
```

Similarly the TestImportData seems to pass under stressrace with a 5s
`kv.closed_timestamp.target_duration`.

Release justification: fixes a release blocker and known customer issue.

References #37083.

Release note: None
parent d21dad80
......@@ -132,6 +132,6 @@
<tr><td><code>trace.debug.enable</code></td><td>boolean</td><td><code>false</code></td><td>if set, traces for recent requests can be seen in the /debug page</td></tr>
<tr><td><code>trace.lightstep.token</code></td><td>string</td><td><code></code></td><td>if set, traces go to Lightstep using this token</td></tr>
<tr><td><code>trace.zipkin.collector</code></td><td>string</td><td><code></code></td><td>if set, traces go to the given Zipkin instance (example: '127.0.0.1:9411'); ignored if trace.lightstep.token is set</td></tr>
<tr><td><code>version</code></td><td>custom validation</td><td><code>19.1-9</code></td><td>set the active cluster version in the format '<major>.<minor>'</td></tr>
<tr><td><code>version</code></td><td>custom validation</td><td><code>19.1-10</code></td><td>set the active cluster version in the format '<major>.<minor>'</td></tr>
</tbody>
</table>
......@@ -178,7 +178,7 @@ func getRelevantDescChanges(
// obviously interesting to our backup.
for _, i := range descs {
interestingIDs[i.GetID()] = struct{}{}
if t := i.GetTable(); t != nil {
if t := i.Table(hlc.Timestamp{}); t != nil {
for j := t.ReplacementOf.ID; j != sqlbase.InvalidID; j = priorIDs[j] {
interestingIDs[j] = struct{}{}
}
......@@ -200,7 +200,7 @@ func getRelevantDescChanges(
return nil, err
}
for _, i := range starting {
if table := i.GetTable(); table != nil {
if table := i.Table(hlc.Timestamp{}); table != nil {
// We need to add to interestingIDs so that if we later see a delete for
// this ID we still know it is interesting to us, even though we will not
// have a parentID at that point (since the delete is a nil desc).
......@@ -231,7 +231,7 @@ func getRelevantDescChanges(
if _, ok := interestingIDs[change.ID]; ok {
interestingChanges = append(interestingChanges, change)
} else if change.Desc != nil {
if table := change.Desc.GetTable(); table != nil {
if table := change.Desc.Table(hlc.Timestamp{}); table != nil {
if _, ok := interestingParents[table.ParentID]; ok {
interestingIDs[table.ID] = struct{}{}
interestingChanges = append(interestingChanges, change)
......@@ -279,7 +279,8 @@ func getAllDescChanges(
return nil, err
}
r.Desc = &desc
if t := desc.GetTable(); t != nil && t.ReplacementOf.ID != sqlbase.InvalidID {
t := desc.Table(rev.Timestamp)
if t != nil && t.ReplacementOf.ID != sqlbase.InvalidID {
priorIDs[t.ID] = t.ReplacementOf.ID
}
}
......@@ -303,6 +304,9 @@ func allSQLDescriptors(ctx context.Context, txn *client.Txn) ([]sqlbase.Descript
return nil, errors.NewAssertionErrorWithWrappedErrf(err,
"%s: unable to unmarshal SQL descriptor", row.Key)
}
if row.Value != nil {
sqlDescs[i].Table(row.Value.Timestamp)
}
}
return sqlDescs, nil
}
......@@ -379,7 +383,7 @@ func spansForAllTableIndexes(
// in them that we didn't already get above e.g. indexes or tables that are
// not in latest because they were dropped during the time window in question.
for _, rev := range revs {
if tbl := rev.Desc.GetTable(); tbl != nil {
if tbl := rev.Desc.Table(hlc.Timestamp{}); tbl != nil {
for _, idx := range tbl.AllNonDropIndexes() {
key := tableAndIndex{tableID: tbl.ID, indexID: idx.ID}
if !added[key] {
......@@ -1016,7 +1020,7 @@ func backupPlanHook(
return err
}
}
if tableDesc := desc.GetTable(); tableDesc != nil {
if tableDesc := desc.Table(hlc.Timestamp{}); tableDesc != nil {
if err := p.CheckPrivilege(ctx, tableDesc, privilege.SELECT); err != nil {
return err
}
......@@ -1083,7 +1087,7 @@ func backupPlanHook(
tablesInPrev := make(map[sqlbase.ID]struct{})
dbsInPrev := make(map[sqlbase.ID]struct{})
for _, d := range prevBackups[len(prevBackups)-1].Descriptors {
if t := d.GetTable(); t != nil {
if t := d.Table(hlc.Timestamp{}); t != nil {
tablesInPrev[t.ID] = struct{}{}
}
}
......@@ -1092,7 +1096,7 @@ func backupPlanHook(
}
for _, d := range targetDescs {
if t := d.GetTable(); t != nil {
if t := d.Table(hlc.Timestamp{}); t != nil {
// If we're trying to use a previous backup for this table, ideally it
// actually contains this table.
if _, ok := tablesInPrev[t.ID]; ok {
......@@ -1504,7 +1508,7 @@ func maybeDowngradeTableDescsInBackupDescriptor(
// Copy Descriptors so we can return a shallow copy without mutating the slice.
copy(backupDescCopy.Descriptors, backupDesc.Descriptors)
for i := range backupDesc.Descriptors {
if tableDesc := backupDesc.Descriptors[i].GetTable(); tableDesc != nil {
if tableDesc := backupDesc.Descriptors[i].Table(hlc.Timestamp{}); tableDesc != nil {
downgraded, newDesc, err := tableDesc.MaybeDowngradeForeignKeyRepresentation(ctx, settings)
if err != nil {
return nil, err
......@@ -1534,7 +1538,7 @@ func maybeUpgradeTableDescsInBackupDescriptors(
// descriptors so that they can be looked up.
for _, backupDesc := range backupDescs {
for _, desc := range backupDesc.Descriptors {
if table := desc.GetTable(); table != nil {
if table := desc.Table(hlc.Timestamp{}); table != nil {
protoGetter.Protos[string(sqlbase.MakeDescMetadataKey(table.ID))] =
sqlbase.WrapDescriptor(protoutil.Clone(table).(*sqlbase.TableDescriptor))
}
......@@ -1544,7 +1548,7 @@ func maybeUpgradeTableDescsInBackupDescriptors(
for i := range backupDescs {
backupDesc := &backupDescs[i]
for j := range backupDesc.Descriptors {
if table := backupDesc.Descriptors[j].GetTable(); table != nil {
if table := backupDesc.Descriptors[j].Table(hlc.Timestamp{}); table != nil {
if _, err := table.MaybeUpgradeForeignKeyRepresentation(ctx, protoGetter, skipFKsWithNoMatchingTable); err != nil {
return err
}
......
......@@ -184,7 +184,7 @@ func loadSQLDescsFromBackupsAtTime(
allDescs := make([]sqlbase.Descriptor, 0, len(byID))
for _, desc := range byID {
if t := desc.GetTable(); t != nil {
if t := desc.Table(hlc.Timestamp{}); t != nil {
// A table revisions may have been captured before it was in a DB that is
// backed up -- if the DB is missing, filter the table.
if byID[t.ParentID] == nil {
......@@ -212,7 +212,7 @@ func selectTargets(
seenTable := false
for _, desc := range matched.descs {
if desc.GetTable() != nil {
if desc.Table(hlc.Timestamp{}) != nil {
seenTable = true
break
}
......@@ -1534,7 +1534,7 @@ func doRestorePlan(
for _, desc := range sqlDescs {
if dbDesc := desc.GetDatabase(); dbDesc != nil {
databasesByID[dbDesc.ID] = dbDesc
} else if tableDesc := desc.GetTable(); tableDesc != nil {
} else if tableDesc := desc.Table(hlc.Timestamp{}); tableDesc != nil {
tablesByID[tableDesc.ID] = tableDesc
}
}
......@@ -1686,7 +1686,7 @@ func createImportingTables(
var tables []*sqlbase.TableDescriptor
var oldTableIDs []sqlbase.ID
for _, desc := range sqlDescs {
if tableDesc := desc.GetTable(); tableDesc != nil {
if tableDesc := desc.Table(hlc.Timestamp{}); tableDesc != nil {
tables = append(tables, tableDesc)
oldTableIDs = append(oldTableIDs, tableDesc.ID)
}
......
......@@ -19,6 +19,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/sqlbase"
"github.com/cockroachdb/cockroach/pkg/sql/types"
"github.com/cockroachdb/cockroach/pkg/util/encoding"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
)
......@@ -145,7 +146,7 @@ func backupShowerDefault(ctx context.Context, showSchemas bool) backupShower {
var rows []tree.Datums
var row tree.Datums
for _, descriptor := range desc.Descriptors {
if table := descriptor.GetTable(); table != nil {
if table := descriptor.Table(hlc.Timestamp{}); table != nil {
dbName := descs[table.ParentID]
row = tree.Datums{
tree.NewDString(dbName),
......
......@@ -14,6 +14,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sessiondata"
"github.com/cockroachdb/cockroach/pkg/sql/sqlbase"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/pkg/errors"
)
......@@ -121,7 +122,7 @@ func newDescriptorResolver(descs []sqlbase.Descriptor) (*descriptorResolver, err
}
// Now on to the tables.
for _, desc := range descs {
if tbDesc := desc.GetTable(); tbDesc != nil {
if tbDesc := desc.Table(hlc.Timestamp{}); tbDesc != nil {
if tbDesc.Dropped() {
continue
}
......@@ -214,7 +215,7 @@ func descriptorsMatchingTargets(
desc := descI.(sqlbase.Descriptor)
// If the parent database is not requested already, request it now
parentID := desc.GetTable().GetParentID()
parentID := desc.Table(hlc.Timestamp{}).GetParentID()
if _, ok := alreadyRequestedDBs[parentID]; !ok {
parentDesc := resolver.descByID[parentID]
ret.descs = append(ret.descs, parentDesc)
......
......@@ -20,6 +20,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/sessiondata"
"github.com/cockroachdb/cockroach/pkg/sql/sqlbase"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
)
......@@ -34,6 +35,10 @@ func TestDescriptorsMatchingTargets(t *testing.T) {
*sqlbase.WrapDescriptor(&sqlbase.DatabaseDescriptor{ID: 3, Name: "data"}),
*sqlbase.WrapDescriptor(&sqlbase.DatabaseDescriptor{ID: 5, Name: "empty"}),
}
// Set the timestamp on the table descriptors.
for _, d := range descriptors {
d.Table(hlc.Timestamp{WallTime: 1})
}
tests := []struct {
sessionDatabase string
......
......@@ -200,7 +200,7 @@ func changefeedPlanHook(
}
targets := make(jobspb.ChangefeedTargets, len(targetDescs))
for _, desc := range targetDescs {
if tableDesc := desc.GetTable(); tableDesc != nil {
if tableDesc := desc.Table(hlc.Timestamp{}); tableDesc != nil {
targets[tableDesc.ID] = jobspb.ChangefeedTarget{
StatementTimeName: tableDesc.Name,
}
......
......@@ -260,7 +260,8 @@ func fetchTableDescriptorVersions(
} else if !ok {
return nil
}
remaining, _, _, err := sqlbase.DecodeTableIDIndexID(it.UnsafeKey().Key)
k := it.UnsafeKey()
remaining, _, _, err := sqlbase.DecodeTableIDIndexID(k.Key)
if err != nil {
return err
}
......@@ -282,7 +283,7 @@ func fetchTableDescriptorVersions(
if err := value.GetProto(&desc); err != nil {
return err
}
if tableDesc := desc.GetTable(); tableDesc != nil {
if tableDesc := desc.Table(k.Timestamp); tableDesc != nil {
tableDescs = append(tableDescs, tableDesc)
}
}
......
......@@ -18,6 +18,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/ccl/storageccl"
"github.com/cockroachdb/cockroach/pkg/cli"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/humanizeutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/pkg/errors"
......@@ -96,7 +97,7 @@ func runLoadShow(cmd *cobra.Command, args []string) error {
// in case more fields need to be added to the output.
fmt.Printf("Descriptors:\n")
for _, d := range desc.Descriptors {
if desc := d.GetTable(); desc != nil {
if desc := d.Table(hlc.Timestamp{}); desc != nil {
fmt.Printf(" %d: %s (table)\n", d.GetID(), d.GetName())
}
if desc := d.GetDatabase(); desc != nil {
......
......@@ -25,6 +25,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/storage/engine"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/testcluster"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
"github.com/cockroachdb/cockroach/pkg/workload/bank"
)
......@@ -173,7 +174,7 @@ func BenchmarkImport(b *testing.B) {
{
// TODO(dan): The following should probably make it into
// dataccl.Backup somehow.
tableDesc := backup.Desc.Descriptors[len(backup.Desc.Descriptors)-1].GetTable()
tableDesc := backup.Desc.Descriptors[len(backup.Desc.Descriptors)-1].Table(hlc.Timestamp{})
if tableDesc == nil || tableDesc.ParentID == keys.SystemDatabaseID {
b.Fatalf("bad table descriptor: %+v", tableDesc)
}
......
......@@ -14,6 +14,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/sql/sqlbase"
"github.com/cockroachdb/cockroach/pkg/util/encoding"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
"github.com/pkg/errors"
)
......@@ -63,7 +64,7 @@ func MakeKeyRewriterFromRekeys(rekeys []roachpb.ImportRequest_TableRekey) (*KeyR
if err := protoutil.Unmarshal(rekey.NewDesc, &desc); err != nil {
return nil, errors.Wrapf(err, "unmarshalling rekey descriptor for old table id %d", rekey.OldID)
}
table := desc.GetTable()
table := desc.Table(hlc.Timestamp{})
if table == nil {
return nil, errors.New("expected a table descriptor")
}
......
......@@ -15,6 +15,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/sql/sqlbase"
"github.com/cockroachdb/cockroach/pkg/util/encoding"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
)
......@@ -140,8 +141,11 @@ func TestKeyRewriter(t *testing.T) {
})
}
func mustMarshalDesc(t *testing.T, desc *sqlbase.TableDescriptor) []byte {
bytes, err := protoutil.Marshal(sqlbase.WrapDescriptor(desc))
func mustMarshalDesc(t *testing.T, tableDesc *sqlbase.TableDescriptor) []byte {
desc := sqlbase.WrapDescriptor(tableDesc)
// Set the timestamp to a non-zero value.
desc.Table(hlc.Timestamp{WallTime: 1})
bytes, err := protoutil.Marshal(desc)
if err != nil {
t.Fatal(err)
}
......
......@@ -96,7 +96,7 @@ func (b *Backup) NextKeyValues(
) ([]engine.MVCCKeyValue, roachpb.Span, error) {
var userTables []*sqlbase.TableDescriptor
for _, d := range b.Desc.Descriptors {
if t := d.GetTable(); t != nil && t.ParentID != keys.SystemDatabaseID {
if t := d.Table(hlc.Timestamp{}); t != nil && t.ParentID != keys.SystemDatabaseID {
userTables = append(userTables, t)
}
}
......
......@@ -28,10 +28,15 @@ import (
)
// KeyValue represents a single key/value pair. This is similar to
// roachpb.KeyValue except that the value may be nil.
// roachpb.KeyValue except that the value may be nil. The timestamp
// in the value will be populated with the MVCC timestamp at which this
// value was read if this struct was produced by a GetRequest or
// ScanRequest which uses the KEY_VALUES ScanFormat. Values created from
// a ScanRequest which uses the BATCH_RESPONSE ScanFormat will contain a
// zero Timestamp.
type KeyValue struct {
Key roachpb.Key
Value *roachpb.Value // Timestamp will always be zero
Value *roachpb.Value
}
func (kv *KeyValue) String() string {
......@@ -319,11 +324,28 @@ func (db *DB) Get(ctx context.Context, key interface{}) (KeyValue, error) {
//
// key can be either a byte slice or a string.
func (db *DB) GetProto(ctx context.Context, key interface{}, msg protoutil.Message) error {
_, err := db.GetProtoTs(ctx, key, msg)
return err
}
// GetProtoTs retrieves the value for a key and decodes the result as a proto
// message. It additionally returns the timestamp at which the key was read.
// If the key doesn't exist, the proto will simply be reset and a zero timestamp
// will be returned. A zero timestamp will also be returned if unmarshaling
// fails.
//
// key can be either a byte slice or a string.
func (db *DB) GetProtoTs(
ctx context.Context, key interface{}, msg protoutil.Message,
) (hlc.Timestamp, error) {
r, err := db.Get(ctx, key)
if err != nil {
return err
return hlc.Timestamp{}, err
}
if err := r.ValueProto(msg); err != nil || r.Value == nil {
return hlc.Timestamp{}, err
}
return r.ValueProto(msg)
return r.Value.Timestamp, nil
}
// Put sets the value for a key.
......
......@@ -263,14 +263,6 @@ func (txn *Txn) CommitTimestamp() hlc.Timestamp {
return txn.mu.sender.CommitTimestamp()
}
// CommitTimestampFixed returns true if the commit timestamp has
// been fixed to the start timestamp and cannot be pushed forward.
func (txn *Txn) CommitTimestampFixed() bool {
txn.mu.Lock()
defer txn.mu.Unlock()
return txn.mu.sender.CommitTimestampFixed()
}
// SetSystemConfigTrigger sets the system db trigger to true on this transaction.
// This will impact the EndTransactionRequest.
func (txn *Txn) SetSystemConfigTrigger() error {
......@@ -319,11 +311,28 @@ func (txn *Txn) Get(ctx context.Context, key interface{}) (KeyValue, error) {
//
// key can be either a byte slice or a string.
func (txn *Txn) GetProto(ctx context.Context, key interface{}, msg protoutil.Message) error {
_, err := txn.GetProtoTs(ctx, key, msg)
return err
}
// GetProtoTs retrieves the value for a key and decodes the result as a proto
// message. It additionally returns the timestamp at which the key was read.
// If the key doesn't exist, the proto will simply be reset and a zero timestamp
// will be returned. A zero timestamp will also be returned if unmarshaling
// fails.
//
// key can be either a byte slice or a string.
func (txn *Txn) GetProtoTs(
ctx context.Context, key interface{}, msg protoutil.Message,
) (hlc.Timestamp, error) {
r, err := txn.Get(ctx, key)
if err != nil {
return err
return hlc.Timestamp{}, err
}
if err := r.ValueProto(msg); err != nil || r.Value == nil {
return hlc.Timestamp{}, err
}
return r.ValueProto(msg)
return r.Value.Timestamp, nil
}
// Put sets the value for a key
......
......@@ -497,7 +497,7 @@ func (s *Server) collectSchemaInfo(ctx context.Context) ([]sqlbase.TableDescript
if err := kv.ValueProto(&desc); err != nil {
return nil, errors.Wrapf(err, "%s: unable to unmarshal SQL descriptor", kv.Key)
}
if t := desc.GetTable(); t != nil && t.ID > keys.MaxReservedDescID {
if t := desc.Table(kv.Value.Timestamp); t != nil && t.ID > keys.MaxReservedDescID {
if err := reflectwalk.Walk(t, redactor); err != nil {
panic(err) // stringRedactor never returns a non-nil err
}
......
......@@ -44,6 +44,7 @@ const (
VersionTopLevelForeignKeys
VersionAtomicChangeReplicasTrigger
VersionAtomicChangeReplicas
VersionTableDescModificationTimeFromMVCC
// Add new versions here (step one of two).
......@@ -532,6 +533,18 @@ var versionsSingleton = keyedVersions([]keyedVersion{
Key: VersionAtomicChangeReplicas,
Version: roachpb.Version{Major: 19, Minor: 1, Unstable: 9},
},
{
// VersionTableDescModificationTimeFromMVCC is https://github.com/cockroachdb/cockroach/pull/40581
//
// It represents an upgrade to the table descriptor format in which
// CreateAsOfTime and ModifiedTime are set to zero when new versions of
// table descriptors are written. This removes the need to fix the commit
// timestamp for transactions which update table descriptors. The value
// is then populated by the reading client with the MVCC timestamp of the
// row which contained the serialized table descriptor.
Key: VersionTableDescModificationTimeFromMVCC,
Version: roachpb.Version{Major: 19, Minor: 1, Unstable: 10},
},
// Add new versions here (step two of two).
......
......@@ -21,11 +21,12 @@ func _() {
_ = x[VersionTopLevelForeignKeys-10]
_ = x[VersionAtomicChangeReplicasTrigger-11]
_ = x[VersionAtomicChangeReplicas-12]
_ = x[VersionTableDescModificationTimeFromMVCC-13]
}
const _VersionKey_name = "Version2_1VersionUnreplicatedRaftTruncatedStateVersionSideloadedStorageNoReplicaIDVersion19_1VersionStart19_2VersionQueryTxnTimestampVersionStickyBitVersionParallelCommitsVersionGenerationComparableVersionLearnerReplicasVersionTopLevelForeignKeysVersionAtomicChangeReplicasTriggerVersionAtomicChangeReplicas"
const _VersionKey_name = "Version2_1VersionUnreplicatedRaftTruncatedStateVersionSideloadedStorageNoReplicaIDVersion19_1VersionStart19_2VersionQueryTxnTimestampVersionStickyBitVersionParallelCommitsVersionGenerationComparableVersionLearnerReplicasVersionTopLevelForeignKeysVersionAtomicChangeReplicasTriggerVersionAtomicChangeReplicasVersionTableDescModificationTimeFromMVCC"
var _VersionKey_index = [...]uint16{0, 10, 47, 82, 93, 109, 133, 149, 171, 198, 220, 246, 280, 307}
var _VersionKey_index = [...]uint16{0, 10, 47, 82, 93, 109, 133, 149, 171, 198, 220, 246, 280, 307, 347}
func (i VersionKey) String() string {
if i < 0 || i >= VersionKey(len(_VersionKey_index)-1) {
......
......@@ -468,6 +468,10 @@ func (ex *connExecutor) execStmtInOpenState(
// leases will only go down over time: no new conflicting leases can be
// created as of the time of this call because v-2 can't be leased once
// v-1 exists.
//
// If this method succeeds it is the caller's responsibility to release the
// executor's table leases after the txn commits so that schema changes can
// proceed.
func (ex *connExecutor) checkTableTwoVersionInvariant(ctx context.Context) error {
tables := ex.extraTxnState.tables.getTablesWithNewVersion()
if tables == nil {
......@@ -477,40 +481,36 @@ func (ex *connExecutor) checkTableTwoVersionInvariant(ctx context.Context) error
if txn.IsCommitted() {
panic("transaction has already committed")
}
if !txn.CommitTimestampFixed() {
panic("commit timestamp was not fixed")
}
// Release leases here for two reasons:
// 1. If there are existing leases at version V-2 for a descriptor
// being modified to version V being held the wait loop below that
// waits on a cluster wide release of old version leases will hang
// until these leases expire.
// 2. Once this transaction commits, the schema changers run and
// increment the version of the modified descriptors. If one of the
// descriptors being modified has a lease being held the schema
// changers will stall until the leases expire.
//
// The above two cases can be satified by releasing leases for both
// cases explicitly, but we prefer to call it here and kill two birds
// with one stone.
// We potentially hold leases for tables which we've modified which
// we need to drop. Say we're updating tables at version V. All leases
// for version V-2 need to be dropped immediately, otherwise the check
// below that nobody holds leases for version V-2 will fail. Worse yet,
// the code below loops waiting for nobody to hold leases on V-2. We also
// may hold leases for version V-1 of modified tables that are good to drop
// but not as vital for correctness. It's good to drop them because as soon
// as this transaction commits jobs may start and will need to wait until
// the lease expires. It is safe because V-1 must remain valid until this
// transaction commits; if we commit then nobody else could have written
// a new V beneath us because we've already laid down an intent.
//
// It is safe to release leases even though the transaction hasn't yet
// committed only because the transaction timestamp has been fixed using
// CommitTimestamp().
//
// releaseLeases can fail to release a lease if the server is shutting
// down. This is okay because it will result in the two cases mentioned
// above simply hanging until the expiration time for the leases.
ex.extraTxnState.tables.releaseLeases(ctx)
count, err := CountLeases(ctx, ex.server.cfg.InternalExecutor, tables, txn.OrigTimestamp())
// All this being said, we must retain our leases on tables which we have
// not modified to ensure that our writes to those other tables in this
// transaction remain valid.
ex.extraTxnState.tables.releaseTableLeases(ctx, tables)
// We know that so long as there are no leases on the updated tables as of
// the current provisional commit timestamp for this transaction then if this
// transaction ends up committing then there won't have been any created
// in the meantime.
count, err := CountLeases(ctx, ex.server.cfg.InternalExecutor, tables, txn.Serialize().Timestamp)
if err != nil {
return err
}
if count == 0 {
return nil
}
// Restart the transaction so that it is able to replay itself at a newer timestamp
// with the hope that the next time around there will be leases only at the current
// version.
......@@ -533,6 +533,9 @@ func (ex *connExecutor) checkTableTwoVersionInvariant(ctx context.Context) error
// We cleanup the transaction and create a new transaction wait time
// might be extensive and so we'd better get rid of all the intents.
txn.CleanupOnError(ctx, retryErr)
// Release the rest of our leases on unmodified tables so we don't hold up
// schema changes there and potentially create a deadlock.
ex.extraTxnState.tables.releaseLeases(ctx)
// Wait until all older version leases have been released or expired.
for r := retry.StartWithCtx(ctx, base.DefaultRetryOptions()); r.Next(); {
......@@ -580,6 +583,13 @@ func (ex *connExecutor) commitSQLTransaction(
return ex.makeErrEvent(err, stmt)
}
// Now that we've committed, if we modified any table we need to make sure
// to release the leases for them so that the schema change can proceed and
// we don't block the client.
if tables := ex.extraTxnState.tables.getTablesWithNewVersion(); tables != nil {
ex.extraTxnState.tables.releaseLeases(ctx)
}
if !isRelease {
return eventTxnFinish{}, eventTxnFinishPayload{commit: true}
}
......
......@@ -77,7 +77,7 @@ func doCreateSequence(
privs := dbDesc.GetPrivileges()
desc, err := MakeSequenceTableDesc(name.Table(), opts,
dbDesc.ID, id, params.p.txn.CommitTimestamp(), privs, params.EvalContext().Settings)
dbDesc.ID, id, params.creationTimeForNewTableDescriptor(), privs, params.EvalContext().Settings)
if err != nil {
return err
}
......
......@@ -164,7 +164,7 @@ func (n *createTableNode) startExec(params runParams) error {
var asCols sqlbase.ResultColumns
var desc sqlbase.MutableTableDescriptor
var affected map[sqlbase.ID]*sqlbase.MutableTableDescriptor
creationTime := params.p.txn.CommitTimestamp()
creationTime := params.creationTimeForNewTableDescriptor()
if n.n.As() {
asCols = planColumns(n.sourcePlan)
if !n.run.fromHeuristicPlanner && !n.n.AsHasUserSpecifiedPrimaryKey() {
......@@ -1078,7 +1078,6 @@ func MakeTableDesc(
evalCtx *tree.EvalContext,
) (sqlbase.MutableTableDescriptor, error) {
desc := InitTableDescriptor(id, parentID, n.Table.Table(), creationTime, privileges)
for _, def := range n.Defs {
if d, ok := def.(*tree.ColumnTableDef); ok {
if !desc.IsVirtualTable() {
......
......@@ -135,7 +135,7 @@ func (n *createViewNode) startExec(params runParams) error {
n.dbDesc.ID,
id,
n.columns,
params.p.txn.CommitTimestamp(),
params.creationTimeForNewTableDescriptor(),
privs,
&params.p.semaCtx,
)
......
......@@ -165,13 +165,13 @@ func getDescriptorByID(
log.Eventf(ctx, "fetching descriptor with ID %d", id)
descKey := sqlbase.MakeDescMetadataKey(id)
desc := &sqlbase.Descriptor{}
if err := txn.GetProto(ctx, descKey, desc); err != nil {
ts, err := txn.GetProtoTs(ctx, descKey, desc)
if err != nil {
return err
}
switch t := descriptor.(type) {
case *sqlbase.TableDescriptor:
table := desc.GetTable()
table := desc.Table(ts)
if table == nil {
return pgerror.Newf(pgcode.WrongObjectType,
"%q is not a table", desc.String())
......@@ -216,7 +216,7 @@ func GetAllDescriptors(ctx context.Context, txn *client.Txn) ([]sqlbase.Descript
}
switch t := desc.Union.(type) {
case *sqlbase.Descriptor_Table:
table := desc.GetTable()
table := desc.Table(kv.Value.Timestamp)
if err := table.MaybeFillInDescriptor(ctx, txn); err != nil {
return nil, err
}
......
......@@ -157,10 +157,11 @@ INSERT INTO t.kv VALUES ('c', 'e'), ('a', 'c'), ('b', 'd');
t.Fatalf(`table "kv" does not exist`)
}
tbDescKey := sqlbase.MakeDescMetadataKey(sqlbase.ID(gr.ValueInt()))
if err := kvDB.GetProto(ctx, tbDescKey, desc); err != nil {
ts, err := kvDB.GetProtoTs(ctx, tbDescKey, desc)
if err != nil {
t.Fatal(err)
}
tbDesc := desc.GetTable()
tbDesc := desc.Table(ts)
// Add a zone config for both the table and database.
cfg := config.DefaultZoneConfig()
......@@ -330,10 +331,11 @@ INSERT INTO t.kv2 VALUES ('c', 'd'), ('a', 'b'), ('e', 'a');
t.Fatalf(`table "kv" does not exist`)
}
tbDescKey := sqlbase.MakeDescMetadataKey(sqlbase.ID(gr.ValueInt()))
if err := kvDB.GetProto(ctx, tbDescKey, desc); err != nil {
ts, err := kvDB.GetProtoTs(ctx, tbDescKey, desc)
if err != nil {
t.Fatal(err)
}
tbDesc := desc.GetTable()
tbDesc := desc.Table(ts)
tb2NameKey := sqlbase.MakeNameMetadataKey(dbDesc.ID, "kv2")
gr2, err := kvDB.Get(ctx, tb2NameKey)
......@@ -344,10 +346,11 @@ INSERT INTO t.kv2 VALUES ('c', 'd'), ('a', 'b'), ('e', 'a');
t.Fatalf(`table "kv2" does not exist`)
}
tb2DescKey := sqlbase.MakeDescMetadataKey(sqlbase.ID(gr2.ValueInt()))
if err := kvDB.GetProto(ctx, tb2DescKey, desc); err != nil {
ts, err = kvDB.GetProtoTs(ctx, tb2DescKey, desc)
if err != nil {
t.Fatal(err)
}