提交 ab9408d9 authored 作者: Andrei Tokar's avatar Andrei Tokar

fix KillRestartMulti in progress

上级 6c004246
......@@ -784,12 +784,12 @@ public class Database implements DataHandler {
data.session = systemSession;
meta = mainSchema.createTable(data);
IndexColumn[] pkCols = IndexColumn.wrap(new Column[] { columnId });
starting = true;
metaIdIndex = meta.addIndex(systemSession, "SYS_ID",
0, pkCols, IndexType.createPrimaryKey(
false, false), true, null);
systemSession.commit(true);
objectIds.set(0);
starting = true;
Cursor cursor = metaIdIndex.find(systemSession, null, null);
ArrayList<MetaRecord> records = new ArrayList<>((int) metaIdIndex.getRowCountApproximation());
while (cursor.next()) {
......@@ -1727,10 +1727,18 @@ public class Database implements DataHandler {
lockMeta(session);
synchronized (this) {
int id = obj.getId();
removeMeta(session, id);
addMeta(session, obj);
// for temporary objects
if (id > 0) {
if(id > 0) {
if(!starting && !obj.isTemporary()) {
Row newRow = meta.getTemplateRow();
MetaRecord.populateRowFromDBObject(obj, newRow);
Row oldRow = metaIdIndex.getRow(session, id);
if (oldRow != null) {
meta.updateRow(session, oldRow, newRow);
}
// removeMeta(session, id);
// addMeta(session, obj);
}
// for temporary objects
objectIds.set(id);
}
}
......
......@@ -24,6 +24,13 @@ public class MetaRecord implements Comparable<MetaRecord> {
private final int objectType;
private final String sql;
public static void populateRowFromDBObject(DbObject obj, SearchRow r) {
r.setValue(0, ValueInt.get(obj.getId()));
r.setValue(1, ValueInt.get(0));
r.setValue(2, ValueInt.get(obj.getType()));
r.setValue(3, ValueString.get(obj.getCreateSQL()));
}
public MetaRecord(SearchRow r) {
id = r.getValue(0).getInt();
objectType = r.getValue(2).getInt();
......
......@@ -481,4 +481,10 @@ public abstract class BaseIndex extends SchemaObjectBase implements Index {
// Lookup batching is not supported.
return null;
}
@Override
public void update(Session session, Row oldRow, Row newRow) {
remove(session, oldRow);
add(session, newRow);
}
}
......@@ -51,6 +51,15 @@ public interface Index extends SchemaObject {
*/
void remove(Session session, Row row);
/**
* Update index after row change.
*
* @param session the session
* @param oldRow row before the update
* @param newRow row after the update
*/
void update(Session session, Row oldRow, Row newRow);
/**
* Returns {@code true} if {@code find()} implementation performs scan over all
* index, {@code false} if {@code find()} performs the fast lookup.
......
......@@ -119,6 +119,11 @@ public class PageDelegateIndex extends PageIndex {
// nothing to do
}
@Override
public void update(Session session, Row oldRow, Row newRow) {
// nothing to do
}
@Override
public void remove(Session session) {
mainIndex.setMainIndexColumn(-1);
......
......@@ -54,6 +54,11 @@ public class MVDelegateIndex extends BaseIndex implements MVIndex {
// nothing to do
}
@Override
public Row getRow(Session session, long key) {
return mainIndex.getRow(session, key);
}
@Override
public boolean canGetFirstOrLast() {
return true;
......@@ -109,6 +114,11 @@ public class MVDelegateIndex extends BaseIndex implements MVIndex {
// nothing to do
}
@Override
public void update(Session session, Row oldRow, Row newRow) {
// nothing to do
}
@Override
public void remove(Session session) {
mainIndex.setMainIndexColumn(SearchRow.ROWID_INDEX);
......
......@@ -162,6 +162,53 @@ public class MVPrimaryIndex extends BaseIndex {
}
}
@Override
public void update(Session session, Row oldRow, Row newRow) {
if (mainIndexColumn != SearchRow.ROWID_INDEX) {
long c = newRow.getValue(mainIndexColumn).getLong();
newRow.setKey(c);
}
long key = oldRow.getKey();
assert mainIndexColumn != SearchRow.ROWID_INDEX || key != 0;
assert key == newRow.getKey() : key + " != " + newRow.getKey();
if (mvTable.getContainsLargeObject()) {
for (int i = 0, len = oldRow.getColumnCount(); i < len; i++) {
Value oldValue = oldRow.getValue(i);
Value newValue = newRow.getValue(i);
if(oldValue != newValue) {
if (oldValue.isLinkedToTable()) {
session.removeAtCommit(oldValue);
}
Value v2 = newValue.copy(database, getId());
if (v2.isLinkedToTable()) {
session.removeAtCommitStop(v2);
}
if (newValue != v2) {
newRow.setValue(i, v2);
}
}
}
}
TransactionMap<Value,Value> map = getMap(session);
try {
Value existing = map.put(ValueLong.get(key), ValueArray.get(newRow.getValueList()));
if (existing == null) {
throw DbException.get(ErrorCode.ROW_NOT_FOUND_WHEN_DELETING_1,
getSQL() + ": " + key);
}
} catch (IllegalStateException e) {
throw mvTable.convertException(e);
}
// because it's possible to directly update the key using the _rowid_
// syntax
if (newRow.getKey() > lastKey.get()) {
lastKey.set(newRow.getKey());
}
}
public void lockRows(Session session, Iterable<Row> rowsForUpdate) {
TransactionMap<Value, Value> map = getMap(session);
for (Row row : rowsForUpdate) {
......@@ -216,7 +263,7 @@ public class MVPrimaryIndex extends BaseIndex {
Value v = map.get(ValueLong.get(key));
if (v == null) {
throw DbException.get(ErrorCode.ROW_NOT_FOUND_IN_PRIMARY_INDEX,
getSQL() + ": " + key);
getSQL(), String.valueOf(key));
}
ValueArray array = (ValueArray) v;
Row row = session.createRow(array.getList(), 0);
......
......@@ -252,6 +252,13 @@ public final class MVSecondaryIndex extends BaseIndex implements MVIndex {
}
}
@Override
public void update(Session session, Row oldRow, Row newRow) {
if (compareRows(oldRow, newRow ) != 0) {
super.update(session, oldRow, newRow);
}
}
@Override
public Cursor find(Session session, SearchRow first, SearchRow last) {
return find(session, first, false, last);
......
......@@ -739,6 +739,27 @@ public class MVTable extends TableBase {
analyzeIfRequired(session);
}
@Override
public void updateRow(Session session, Row oldRow, Row newRow) {
newRow.setKey(oldRow.getKey());
lastModificationId = database.getNextModificationDataId();
Transaction t = session.getTransaction();
long savepoint = t.setSavepoint();
try {
for (Index index : indexes) {
index.update(session, oldRow, newRow);
}
} catch (Throwable e) {
try {
t.rollbackToSavepoint(savepoint);
} catch (Throwable nested) {
e.addSuppressed(nested);
}
throw DbException.convert(e);
}
analyzeIfRequired(session);
}
@Override
public void lockRows(Session session, Iterable<Row> rowsForUpdate) {
primaryIndex.lockRows(session, rowsForUpdate);
......
......@@ -18,6 +18,7 @@ public abstract class TxDecisionMaker extends MVMap.DecisionMaker<VersionedValue
final Object value;
private final Transaction transaction;
long undoKey;
private long lastOperationId;
private Transaction blockingTransaction;
private MVMap.Decision decision;
......@@ -48,24 +49,36 @@ public abstract class TxDecisionMaker extends MVMap.DecisionMaker<VersionedValue
// because a tree root has definitely been changed.
logIt(existingValue.value == null ? null : VersionedValue.getInstance(existingValue.value));
decision = MVMap.Decision.PUT;
} else if(fetchTransaction(blockingId) == null) {
// condition above means transaction has been committed/rolled back and closed by now
decision = MVMap.Decision.REPEAT;
} else {
// this entry comes from a different transaction, and this
// transaction is not committed yet
} else if(fetchTransaction(blockingId) != null) {
// this entry comes from a different transaction, and this transaction is not committed yet
// should wait on blockingTransaction that was determined earlier
decision = MVMap.Decision.ABORT;
} else if(id == lastOperationId) {
// There is no transaction with that id, so we've retried it just before,
// but map root has not changed (which must be the case if we just missed a closed transaction),
// therefore we came back here again.
// Now we assume it's a leftover after unclean shutdown (map update was written but not undo log),
// and will effectively roll it back (just overwrite).
Object committedValue = existingValue.getCommittedValue();
logIt(committedValue == null ? null : VersionedValue.getInstance(committedValue));
decision = MVMap.Decision.PUT;
} else {
// condition above means transaction has been committed/rolled back and closed by now
decision = MVMap.Decision.REPEAT;
lastOperationId = id;
}
return decision;
}
@Override
public final void reset() {
if (decision != null && decision != MVMap.Decision.ABORT && decision != MVMap.Decision.REPEAT) {
// positive decision has been made already and undo record created,
// but map was updated afterwards and undo record deletion required
transaction.logUndo();
if (decision != MVMap.Decision.REPEAT) {
lastOperationId = 0;
if (decision == MVMap.Decision.PUT) {
// positive decision has been made already and undo record created,
// but map was updated afterwards and undo record deletion required
transaction.logUndo();
}
}
blockingTransaction = null;
decision = null;
......
......@@ -210,6 +210,20 @@ public abstract class Table extends SchemaObjectBase {
*/
public abstract void addRow(Session session, Row row);
/**
* Update a row to the table and all indexes.
*
* @param session the session
* @param oldRow the row to update
* @param newRow the row with updated values (_rowid_ suppose to be the same)
* @throws DbException if a constraint was violated
*/
public void updateRow(Session session, Row oldRow, Row newRow) {
newRow.setKey(oldRow.getKey());
removeRow(session, oldRow);
addRow(session, newRow);
}
/**
* Commit an operation (when using multi-version concurrency).
*
......
......@@ -83,6 +83,7 @@ public class TestKillRestartMulti extends TestBase {
// show up in our log.
ProcessBuilder pb = new ProcessBuilder().redirectError(Redirect.INHERIT)
.command("java", selfDestruct, "-cp", getClassPath(),
"-ea",
getClass().getName(), "-url", url, "-user", user,
"-password", password);
deleteDb("killRestartMulti");
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论