提交 afbd0f53 authored 作者: Andrei Tokar's avatar Andrei Tokar

MVStore: lock rows before update/delete

上级 eb3d258e
...@@ -83,9 +83,14 @@ public class Delete extends Prepared { ...@@ -83,9 +83,14 @@ public class Delete extends Prepared {
if (table.fireRow()) { if (table.fireRow()) {
done = table.fireBeforeRow(session, row, null); done = table.fireBeforeRow(session, row, null);
} }
if (!done) {
if (table.isMVStore()) {
done = table.lockRow(session, row) == null;
}
if (!done) { if (!done) {
rows.add(row); rows.add(row);
} }
}
count++; count++;
if (limitRows >= 0 && count >= limitRows) { if (limitRows >= 0 && count >= limitRows) {
break; break;
......
...@@ -163,10 +163,15 @@ public class Update extends Prepared { ...@@ -163,10 +163,15 @@ public class Update extends Prepared {
if (table.fireRow()) { if (table.fireRow()) {
done = table.fireBeforeRow(session, oldRow, newRow); done = table.fireBeforeRow(session, oldRow, newRow);
} }
if (!done) {
if (table.isMVStore()) {
done = table.lockRow(session, oldRow) == null;
}
if (!done) { if (!done) {
rows.add(oldRow); rows.add(oldRow);
rows.add(newRow); rows.add(newRow);
} }
}
count++; count++;
} }
} }
......
...@@ -209,17 +209,28 @@ public class MVPrimaryIndex extends BaseIndex { ...@@ -209,17 +209,28 @@ public class MVPrimaryIndex extends BaseIndex {
} }
} }
public void lockRows(Session session, Iterable<Row> rowsForUpdate) { void lockRows(Session session, Iterable<Row> rowsForUpdate) {
TransactionMap<Value, Value> map = getMap(session); TransactionMap<Value, Value> map = getMap(session);
for (Row row : rowsForUpdate) { for (Row row : rowsForUpdate) {
long key = row.getKey(); long key = row.getKey();
lockRow(map, key);
}
}
Row lockRow(Session session, Row row) {
TransactionMap<Value, Value> map = getMap(session);
long key = row.getKey();
ValueArray array = (ValueArray) lockRow(map, key);
return array == null ? null : getRow(session, key, array);
}
private Value lockRow(TransactionMap<Value, Value> map, long key) {
try { try {
map.lock(ValueLong.get(key)); return map.lock(ValueLong.get(key));
} catch (IllegalStateException ex) { } catch (IllegalStateException ex) {
throw mvTable.convertException(ex); throw mvTable.convertException(ex);
} }
} }
}
@Override @Override
public Cursor find(Session session, SearchRow first, SearchRow last) { public Cursor find(Session session, SearchRow first, SearchRow last) {
...@@ -259,7 +270,10 @@ public class MVPrimaryIndex extends BaseIndex { ...@@ -259,7 +270,10 @@ public class MVPrimaryIndex extends BaseIndex {
throw DbException.get(ErrorCode.ROW_NOT_FOUND_IN_PRIMARY_INDEX, throw DbException.get(ErrorCode.ROW_NOT_FOUND_IN_PRIMARY_INDEX,
getSQL(), String.valueOf(key)); getSQL(), String.valueOf(key));
} }
ValueArray array = (ValueArray) v; return getRow(session, key, (ValueArray) v);
}
public Row getRow(Session session, long key, ValueArray array) {
Row row = session.createRow(array.getList(), 0); Row row = session.createRow(array.getList(), 0);
row.setKey(key); row.setKey(key);
return row; return row;
......
...@@ -738,6 +738,11 @@ public class MVTable extends TableBase { ...@@ -738,6 +738,11 @@ public class MVTable extends TableBase {
primaryIndex.lockRows(session, rowsForUpdate); primaryIndex.lockRows(session, rowsForUpdate);
} }
@Override
public Row lockRow(Session session, Row row) {
return primaryIndex.lockRow(session, row);
}
private void analyzeIfRequired(Session session) { private void analyzeIfRequired(Session session) {
if (changesUntilAnalyze != null) { if (changesUntilAnalyze != null) {
if (changesUntilAnalyze.decrementAndGet() == 0) { if (changesUntilAnalyze.decrementAndGet() == 0) {
......
...@@ -206,11 +206,22 @@ abstract class TxDecisionMaker extends MVMap.DecisionMaker<VersionedValue> { ...@@ -206,11 +206,22 @@ abstract class TxDecisionMaker extends MVMap.DecisionMaker<VersionedValue> {
super(mapId, key, null, transaction); super(mapId, key, null, transaction);
} }
@Override
public MVMap.Decision decide(VersionedValue existingValue, VersionedValue providedValue) {
MVMap.Decision decision = super.decide(existingValue, providedValue);
if (existingValue == null) {
assert decision == MVMap.Decision.PUT;
decision = setDecision(MVMap.Decision.REMOVE);
}
return decision;
}
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
@Override @Override
public VersionedValue selectValue(VersionedValue existingValue, VersionedValue providedValue) { public VersionedValue selectValue(VersionedValue existingValue, VersionedValue providedValue) {
assert existingValue != null; // otherwise, what's there to lock? return VersionedValue.getInstance(undoKey,
return VersionedValue.getInstance(undoKey, existingValue.value, existingValue.getCommittedValue()); existingValue == null ? null : existingValue.value,
existingValue == null ? null : existingValue.getCommittedValue());
} }
} }
} }
...@@ -184,12 +184,24 @@ public abstract class Table extends SchemaObjectBase { ...@@ -184,12 +184,24 @@ public abstract class Table extends SchemaObjectBase {
*/ */
public void lockRows(Session session, Iterable<Row> rowsForUpdate) { public void lockRows(Session session, Iterable<Row> rowsForUpdate) {
for (Row row : rowsForUpdate) { for (Row row : rowsForUpdate) {
lockRow(session, row);
}
}
/**
* Locks row, preventing any updated to it, except from the session specified.
*
* @param session the session
* @param row to lock
* @return locked row, or null if row does not exist anymore
*/
public Row lockRow(Session session, Row row) {
Row newRow = row.getCopy(); Row newRow = row.getCopy();
removeRow(session, row); removeRow(session, row);
session.log(this, UndoLogRecord.DELETE, row); session.log(this, UndoLogRecord.DELETE, row);
addRow(session, newRow); addRow(session, newRow);
session.log(this, UndoLogRecord.INSERT, newRow); session.log(this, UndoLogRecord.INSERT, newRow);
} return row;
} }
/** /**
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论