提交 5af45034 authored 作者: Thomas Mueller's avatar Thomas Mueller

MVTableEngine: 2-phase commit

上级 1f494333
......@@ -438,9 +438,10 @@ to the database URL. In general, functionality and performance should be
similar than the current default storage engine (the page store).
There are a few features that have not been implemented yet or are not complete:
</p>
<ul><li>Changing the cache size.
</li><li>Two-phase commit.
</li><li>The database metadata is still stored in a <code>.h2.db</code> file.
<ul><li>Changing the cache size is currently not supported.
</li><li>The database metadata is still stored in a <code>.h2.db</code> file,
and the <code>.lock.db</code> file is still used to lock a database
(long term, the plan is to no longer use those files).
</li><li>The database file(s) sometimes do not shrink as expected.
</li></ul>
......
......@@ -15,6 +15,7 @@ import java.util.HashSet;
import java.util.Properties;
import java.util.Set;
import java.util.StringTokenizer;
import org.h2.api.DatabaseEventListener;
import org.h2.command.ddl.CreateTableData;
import org.h2.command.dml.SetTypes;
......@@ -1764,6 +1765,9 @@ public class Database implements DataHandler {
* @return the list
*/
public ArrayList<InDoubtTransaction> getInDoubtTransactions() {
if (mvStore != null) {
return mvStore.getInDoubtTransactions();
}
return pageStore == null ? null : pageStore.getInDoubtTransactions();
}
......@@ -1777,6 +1781,11 @@ public class Database implements DataHandler {
if (readOnly) {
return;
}
if (mvStore != null) {
mvStore.prepareCommit(session, transaction);
pageStore.flushLog();
return;
}
pageStore.prepareCommit(session, transaction);
}
......
......@@ -878,6 +878,9 @@ public class Session extends SessionWithState {
* @param transactionName the name of the transaction
*/
public void prepareCommit(String transactionName) {
if (transaction != null) {
database.prepareCommit(this, transactionName);
}
if (containsUncommitted()) {
// need to commit even if rollback is not possible (create/drop
// table and so on)
......@@ -905,7 +908,7 @@ public class Session extends SessionWithState {
boolean found = false;
if (list != null) {
for (InDoubtTransaction p: list) {
if (p.getTransaction().equals(transactionName)) {
if (p.getTransactionName().equals(transactionName)) {
p.setState(state);
found = true;
break;
......
......@@ -1600,7 +1600,8 @@ public class JdbcConnection extends TraceObject implements Connection {
}
/**
* Set a client property.
* Set a client property.
* This method always throws a SQLClientInfoException.
*/
@Override
public void setClientInfo(String name, String value)
......@@ -1612,6 +1613,7 @@ public class JdbcConnection extends TraceObject implements Connection {
/**
* Set the client properties.
* This method always throws a SQLClientInfoException.
*/
@Override
public void setClientInfo(Properties properties) throws SQLClientInfoException {
......@@ -1622,6 +1624,7 @@ public class JdbcConnection extends TraceObject implements Connection {
/**
* Get the client properties.
* This method always returns null.
*/
@Override
public Properties getClientInfo() throws SQLClientInfoException {
......@@ -1632,6 +1635,7 @@ public class JdbcConnection extends TraceObject implements Connection {
/**
* Set a client property.
* This method always throws a SQLClientInfoException.
*/
@Override
public String getClientInfo(String name) throws SQLException {
......
......@@ -13,8 +13,11 @@ import org.h2.api.TableEngine;
import org.h2.command.ddl.CreateTableData;
import org.h2.engine.Constants;
import org.h2.engine.Database;
import org.h2.engine.Session;
import org.h2.message.DbException;
import org.h2.mvstore.MVStore;
import org.h2.mvstore.db.TransactionStore.Transaction;
import org.h2.store.InDoubtTransaction;
import org.h2.table.RegularTable;
import org.h2.table.TableBase;
import org.h2.util.New;
......@@ -155,10 +158,76 @@ public class MVTableEngine implements TableEngine {
public void rollback() {
List<Transaction> list = transactionStore.getOpenTransactions();
for (Transaction t : list) {
t.rollback();
if (t.getStatus() != Transaction.STATUS_PREPARED) {
t.rollback();
}
}
}
public void prepareCommit(Session session, String transactionName) {
Transaction t = session.getTransaction();
t.setName(transactionName);
t.prepare();
store.store();
}
public ArrayList<InDoubtTransaction> getInDoubtTransactions() {
List<Transaction> list = transactionStore.getOpenTransactions();
ArrayList<InDoubtTransaction> result = New.arrayList();
for (Transaction t : list) {
if (t.getStatus() == Transaction.STATUS_PREPARED) {
result.add(new MVInDoubtTransaction(store, t));
}
}
return result;
}
}
/**
* An in-doubt transaction.
*/
private static class MVInDoubtTransaction implements InDoubtTransaction {
private final MVStore store;
private final Transaction transaction;
private int state = InDoubtTransaction.IN_DOUBT;
MVInDoubtTransaction(MVStore store, Transaction transaction) {
this.store = store;
this.transaction = transaction;
}
@Override
public void setState(int state) {
if (state == InDoubtTransaction.COMMIT) {
transaction.commit();
} else {
transaction.rollback();
}
store.store();
this.state = state;
}
@Override
public String getState() {
switch(state) {
case IN_DOUBT:
return "IN_DOUBT";
case COMMIT:
return "COMMIT";
case ROLLBACK:
return "ROLLBACK";
default:
throw DbException.throwInternalError("state="+state);
}
}
@Override
public String getTransactionName() {
return transaction.getName();
}
}
}
......@@ -492,7 +492,7 @@ public class TransactionStore {
}
public void setName(String name) {
checkOpen();
checkNotClosed();
this.name = name;
store.storeTransaction(this);
}
......@@ -507,7 +507,7 @@ public class TransactionStore {
* @return the savepoint id
*/
public long setSavepoint() {
checkOpen();
checkNotClosed();
return logId;
}
......@@ -532,7 +532,7 @@ public class TransactionStore {
* @return the transaction map
*/
public <K, V> TransactionMap<K, V> openMap(String name) {
checkOpen();
checkNotClosed();
return new TransactionMap<K, V>(this, name, new ObjectDataType(),
new ObjectDataType());
}
......@@ -547,7 +547,7 @@ public class TransactionStore {
* @return the transaction map
*/
public <K, V> TransactionMap<K, V> openMap(String name, Builder<K, V> builder) {
checkOpen();
checkNotClosed();
DataType keyType = builder.getKeyType();
if (keyType == null) {
keyType = new ObjectDataType();
......@@ -564,7 +564,7 @@ public class TransactionStore {
* committed or rolled back.
*/
public void prepare() {
checkOpen();
checkNotClosed();
status = STATUS_PREPARED;
store.storeTransaction(this);
}
......@@ -584,7 +584,7 @@ public class TransactionStore {
* @param savepointId the savepoint id
*/
public void rollbackToSavepoint(long savepointId) {
checkOpen();
checkNotClosed();
store.rollbackTo(this, logId, savepointId);
logId = savepointId;
}
......@@ -611,15 +611,6 @@ public class TransactionStore {
return store.getChanges(this, logId, savepointId);
}
/**
* Check whether this transaction is still open.
*/
void checkOpen() {
if (status != STATUS_OPEN) {
throw DataUtils.newIllegalStateException("Transaction is closed");
}
}
/**
* Check whether this transaction is open or prepared.
*/
......@@ -746,7 +737,7 @@ public class TransactionStore {
}
private V set(K key, V value) {
transaction.checkOpen();
transaction.checkNotClosed();
long start = 0;
while (true) {
V old = get(key);
......@@ -933,7 +924,7 @@ public class TransactionStore {
*/
@SuppressWarnings("unchecked")
public V get(K key, long maxLogId) {
transaction.checkOpen();
transaction.checkNotClosed();
VersionedValue data = getValue(key, maxLogId);
return data == null ? null : (V) data.value;
}
......
......@@ -6,52 +6,26 @@
*/
package org.h2.store;
import org.h2.message.DbException;
/**
* Represents an in-doubt transaction (a transaction in the prepare phase).
*/
public class InDoubtTransaction {
public interface InDoubtTransaction {
/**
* The transaction state meaning this transaction is not committed yet, but
* also not rolled back (in-doubt).
*/
public static final int IN_DOUBT = 0;
int IN_DOUBT = 0;
/**
* The transaction state meaning this transaction is committed.
*/
public static final int COMMIT = 1;
int COMMIT = 1;
/**
* The transaction state meaning this transaction is rolled back.
*/
public static final int ROLLBACK = 2;
// TODO 2-phase-commit: document sql statements and metadata table
private final PageStore store;
private final int sessionId;
private final int pos;
private final String transaction;
private int state;
/**
* Create a new in-doubt transaction info object.
*
* @param store the page store
* @param sessionId the session id
* @param pos the position
* @param transaction the transaction name
*/
public InDoubtTransaction(PageStore store, int sessionId, int pos, String transaction) {
this.store = store;
this.sessionId = sessionId;
this.pos = pos;
this.transaction = transaction;
this.state = IN_DOUBT;
}
int ROLLBACK = 2;
/**
* Change the state of this transaction.
......@@ -59,45 +33,20 @@ public class InDoubtTransaction {
*
* @param state the new state
*/
public void setState(int state) {
switch(state) {
case COMMIT:
store.setInDoubtTransactionState(sessionId, pos, true);
break;
case ROLLBACK:
store.setInDoubtTransactionState(sessionId, pos, false);
break;
default:
DbException.throwInternalError("state="+state);
}
this.state = state;
}
void setState(int state);
/**
* Get the state of this transaction as a text.
*
* @return the transaction state text
*/
public String getState() {
switch(state) {
case IN_DOUBT:
return "IN_DOUBT";
case COMMIT:
return "COMMIT";
case ROLLBACK:
return "ROLLBACK";
default:
throw DbException.throwInternalError("state="+state);
}
}
String getState();
/**
* Get the name of the transaction.
*
* @return the transaction name
*/
public String getTransaction() {
return transaction;
}
String getTransactionName();
}
......@@ -424,11 +424,11 @@ public class PageLog {
*/
private void setPrepareCommit(int sessionId, int pageId, String transaction) {
SessionState state = getOrAddSessionState(sessionId);
InDoubtTransaction doubt;
PageStoreInDoubtTransaction doubt;
if (transaction == null) {
doubt = null;
} else {
doubt = new InDoubtTransaction(store, sessionId, pageId, transaction);
doubt = new PageStoreInDoubtTransaction(store, sessionId, pageId, transaction);
}
state.inDoubtTransaction = doubt;
}
......@@ -816,7 +816,7 @@ public class PageLog {
ArrayList<InDoubtTransaction> getInDoubtTransactions() {
ArrayList<InDoubtTransaction> list = New.arrayList();
for (SessionState state : sessionStates.values()) {
InDoubtTransaction in = state.inDoubtTransaction;
PageStoreInDoubtTransaction in = state.inDoubtTransaction;
if (in != null) {
list.add(in);
}
......
/*
* Copyright 2004-2013 H2 Group. Multiple-Licensed under the H2 License,
* Version 1.0, and under the Eclipse Public License, Version 1.0
* (http://h2database.com/html/license.html).
* Initial Developer: H2 Group
*/
package org.h2.store;
import org.h2.message.DbException;
/**
* Represents an in-doubt transaction (a transaction in the prepare phase).
*/
public class PageStoreInDoubtTransaction implements InDoubtTransaction {
private final PageStore store;
private final int sessionId;
private final int pos;
private final String transactionName;
private int state;
/**
* Create a new in-doubt transaction info object.
*
* @param store the page store
* @param sessionId the session id
* @param pos the position
* @param transaction the transaction name
*/
public PageStoreInDoubtTransaction(PageStore store, int sessionId, int pos, String transaction) {
this.store = store;
this.sessionId = sessionId;
this.pos = pos;
this.transactionName = transaction;
this.state = IN_DOUBT;
}
@Override
public void setState(int state) {
switch(state) {
case COMMIT:
store.setInDoubtTransactionState(sessionId, pos, true);
break;
case ROLLBACK:
store.setInDoubtTransactionState(sessionId, pos, false);
break;
default:
DbException.throwInternalError("state="+state);
}
this.state = state;
}
@Override
public String getState() {
switch(state) {
case IN_DOUBT:
return "IN_DOUBT";
case COMMIT:
return "COMMIT";
case ROLLBACK:
return "ROLLBACK";
default:
throw DbException.throwInternalError("state="+state);
}
}
@Override
public String getTransactionName() {
return transactionName;
}
}
......@@ -31,7 +31,7 @@ class SessionState {
/**
* The in-doubt transaction if there is one.
*/
public InDoubtTransaction inDoubtTransaction;
public PageStoreInDoubtTransaction inDoubtTransaction;
/**
* Check if this session state is already committed at this point.
......
......@@ -17,6 +17,7 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Locale;
import org.h2.command.Command;
import org.h2.constraint.Constraint;
import org.h2.constraint.ConstraintCheck;
......@@ -1305,7 +1306,7 @@ public class MetaTable extends Table {
for (InDoubtTransaction prep : prepared) {
add(rows,
// TRANSACTION
prep.getTransaction(),
prep.getTransactionName(),
// STATE
prep.getState()
);
......
......@@ -620,10 +620,7 @@ kill -9 `jps -l | grep "org.h2.test." | cut -d " " -f 1`
new TestTempTables().runTest(this);
new TestTransaction().runTest(this);
new TestTriggersConstraints().runTest(this);
if (!mvStore) {
// Two-Phase commit is not yet implemented
new TestTwoPhaseCommit().runTest(this);
}
new TestTwoPhaseCommit().runTest(this);
new TestView().runTest(this);
new TestViewAlterTable().runTest(this);
new TestViewDropView().runTest(this);
......@@ -659,10 +656,7 @@ kill -9 `jps -l | grep "org.h2.test." | cut -d " " -f 1`
new TestConnectionPool().runTest(this);
new TestDataSource().runTest(this);
new TestXA().runTest(this);
if (!mvStore) {
// Two-Phase commit is not yet implemented
new TestXASimple().runTest(this);
}
new TestXASimple().runTest(this);
// server
new TestAutoServer().runTest(this);
......
......@@ -139,7 +139,7 @@ public class TestIndex extends TestBase {
String m = ex.getMessage();
int start = m.indexOf('\"'), end = m.indexOf('\"', start + 1);
String s = m.substring(start + 1, end);
assertEquals("IDX_TEST_NAME ON PUBLIC.TEST(NAME) VALUES ( /* 2 */ 'Hello' )", s);
assertEquals("IDX_TEST_NAME ON PUBLIC.TEST(NAME) VALUES ('Hello', 1)", s);
}
stat.execute("drop table test");
}
......
......@@ -45,7 +45,9 @@ public class TestTwoPhaseCommit extends TestBase {
openWith(false);
test(false);
testLargeTransactionName();
if (!config.mvStore) {
testLargeTransactionName();
}
deleteDb("twoPhaseCommit");
}
......
......@@ -45,6 +45,7 @@ public class TestXASimple extends TestBase {
// testTwoPhase(false, false);
testTwoPhase("xaSimple2a", true, true);
testTwoPhase("xaSimple2b", true, false);
}
private void testTwoPhase(String db, boolean shutdown, boolean commit) throws Exception {
......@@ -81,6 +82,8 @@ public class TestXASimple extends TestBase {
} else {
xa.getXAResource().rollback(list[0]);
}
conn = xa.getConnection();
conn.createStatement().executeQuery("select * from test");
if (shutdown) {
shutdown(ds);
}
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论