提交 a82dc556 authored 作者: Thomas Mueller's avatar Thomas Mueller

MVStoreTableEngine

上级 df51f374
......@@ -87,7 +87,7 @@ public class Analyze extends DefineCommand {
if (type == Value.BLOB || type == Value.CLOB) {
// can not index LOB columns, so calculating
// the selectivity is not required
buff.append("100");
buff.append("MAX(100)");
} else {
buff.append("SELECTIVITY(").append(col.getSQL()).append(')');
}
......
......@@ -172,6 +172,7 @@ public class CreateTable extends SchemaCommand {
boolean old = session.isUndoLogEnabled();
try {
session.setUndoLogEnabled(false);
session.startStatementWithinTransaction();
Insert insert = null;
insert = new Insert(session);
insert.setSortedInsertMode(sortedInsertMode);
......
......@@ -636,6 +636,7 @@ public class ConstraintReferential extends Constraint {
// don't check at startup
return;
}
session.startStatementWithinTransaction();
StatementBuilder buff = new StatementBuilder("SELECT 1 FROM (SELECT ");
for (IndexColumn c : columns) {
buff.appendExceptFirst(", ");
......
......@@ -1742,6 +1742,9 @@ public class Database implements DataHandler {
// TODO check if MIN_WRITE_DELAY is a good value
flushOnEachCommit = writeDelay < Constants.MIN_WRITE_DELAY;
}
if (mvStore != null) {
mvStore.setWriteDelay(value);
}
}
/**
......
......@@ -1287,7 +1287,16 @@ public class Session extends SessionWithState {
}
public Value getTransactionId() {
if (undoLog.size() == 0 || !database.isPersistent()) {
if (!database.isPersistent()) {
return ValueNull.INSTANCE;
}
if (database.getMvStore() != null) {
if (transaction == null) {
return ValueNull.INSTANCE;
}
return ValueString.get(Long.toString(getTransaction().getId()));
}
if (undoLog.size() == 0) {
return ValueNull.INSTANCE;
}
return ValueString.get(firstUncommittedLog + "-" + firstUncommittedPos + "-" + id);
......@@ -1325,6 +1334,13 @@ public class Session extends SessionWithState {
}
return startStatement;
}
/**
* Start a new statement within a transaction.
*/
public void startStatementWithinTransaction() {
startStatement = -1;
}
/**
* Mark the statement as completed. This also close all temporary result
......
......@@ -129,9 +129,11 @@ public class MVStore {
private static final int FORMAT_READ = 1;
/**
* Whether the store is closed.
* The background thread, if any.
*/
volatile boolean closed;
volatile Thread backgroundThread;
private boolean closed;
private final String fileName;
private final char[] filePassword;
......@@ -219,8 +221,6 @@ public class MVStore {
*/
private Chunk retainChunk;
private Thread backgroundThread;
/**
* The version of the current store operation (if any).
*/
......@@ -469,15 +469,7 @@ public class MVStore {
rollbackTo(rollback);
}
this.lastCommittedVersion = currentVersion;
// start the background thread if needed
if (writeDelay > 0) {
int sleep = Math.max(1, writeDelay / 10);
Writer w = new Writer(this, sleep);
Thread t = new Thread(w, "MVStore writer " + fileName);
t.setDaemon(true);
t.start();
backgroundThread = t;
}
setWriteDelay(writeDelay);
}
/**
......@@ -696,18 +688,7 @@ public class MVStore {
// can not synchronize on this yet, because
// the thread also synchronized on this, which
// could result in a deadlock
if (backgroundThread != null) {
Thread t = backgroundThread;
backgroundThread = null;
synchronized (this) {
notify();
}
try {
t.join();
} catch (Exception e) {
// ignore
}
}
stopBackgroundThread();
synchronized (this) {
try {
if (shrinkIfPossible) {
......@@ -1757,6 +1738,40 @@ public class MVStore {
public boolean isClosed() {
return closed;
}
private void stopBackgroundThread() {
if (backgroundThread == null) {
return;
}
Thread t = backgroundThread;
backgroundThread = null;
synchronized (this) {
notify();
}
try {
t.join();
} catch (Exception e) {
// ignore
}
}
public void setWriteDelay(int value) {
writeDelay = value;
stopBackgroundThread();
// start the background thread if needed
if (value > 0) {
int sleep = Math.max(1, value / 10);
Writer w = new Writer(this, sleep);
Thread t = new Thread(w, "MVStore writer " + fileName);
t.setDaemon(true);
t.start();
backgroundThread = t;
}
}
public int getWriteDelay() {
return writeDelay;
}
/**
* A background writer to automatically store changes from time to time.
......@@ -1773,7 +1788,7 @@ public class MVStore {
@Override
public void run() {
while (!store.closed) {
while (store.backgroundThread != null) {
synchronized (store) {
try {
store.wait(sleep);
......
......@@ -137,10 +137,16 @@ public class MVTableEngine implements TableEngine {
* Close the store. Pending changes are persisted.
*/
public void close() {
if (!store.isReadOnly()) {
store.store();
if (!store.isClosed()) {
if (!store.isReadOnly()) {
store.store();
}
store.close();
}
store.close();
}
public void setWriteDelay(int value) {
store.setWriteDelay(value);
}
}
......
......@@ -223,9 +223,8 @@ public class TransactionStore {
return;
}
for (long logId = 0; logId < maxLogId; logId++) {
long[] undoKey = new long[] {
t.getId(), logId };
commitIfNeeded();
long[] undoKey = new long[] { t.getId(), logId };
Object[] op = undoLog.get(undoKey);
int opType = (Integer) op[0];
if (opType == Transaction.OP_REMOVE) {
......@@ -287,6 +286,9 @@ public class TransactionStore {
if (t.getId() == firstOpenTransaction) {
firstOpenTransaction = -1;
}
if (store.getWriteDelay() == 0) {
store.commit();
}
}
/**
......@@ -299,8 +301,8 @@ public class TransactionStore {
void rollbackTo(Transaction t, long maxLogId, long toLogId) {
for (long logId = maxLogId - 1; logId >= toLogId; logId--) {
commitIfNeeded();
Object[] op = undoLog.get(new long[] {
t.getId(), logId });
long[] undoKey = new long[] { t.getId(), logId };
Object[] op = undoLog.get(undoKey);
int mapId = ((Integer) op[1]).intValue();
// TODO open map by id if possible
Map<String, String> meta = store.getMetaMap();
......@@ -316,7 +318,7 @@ public class TransactionStore {
// this transaction updated the value
map.put(key, oldValue);
}
undoLog.remove(op);
undoLog.remove(undoKey);
}
}
......
......@@ -42,6 +42,8 @@ public class TestMVTableEngine extends TestBase {
@Override
public void test() throws Exception {
// testSpeed();
testReferentialIntegrity();
testWriteDelay();
testAutoCommit();
testReopen();
testBlob();
......@@ -107,6 +109,78 @@ int test;
//System.out.println(prof.getTop(10));
System.out.println((System.currentTimeMillis() - time) + " " + dbName + " after");
}
private void testReferentialIntegrity() throws Exception {
FileUtils.deleteRecursive(getBaseDir(), true);
Connection conn;
Statement stat;
conn = getConnection("mvstore;default_table_engine=org.h2.mvstore.db.MVTableEngine");
stat = conn.createStatement();
stat.execute("create table parent(id int)");
stat.execute("create table child(pid int)");
stat.execute("insert into parent values(1)");
stat.execute("insert into child values(2)");
try {
stat.execute("alter table child add constraint cp " +
"foreign key(pid) references parent(id)");
fail();
} catch (SQLException e) {
// expected
}
int todo;
// stat.execute("update child set pid=1");
stat.execute("drop table child, parent");
stat.execute("create table parent(id int)");
stat.execute("create table child(pid int)");
stat.execute("insert into parent values(1)");
stat.execute("insert into child values(2)");
try {
stat.execute("alter table child add constraint cp " +
"foreign key(pid) references parent(id)");
fail();
} catch (SQLException e) {
// expected
}
stat.execute("drop table child, parent");
// currently not supported, as previous rows are not visible
// stat.execute("create table test(id identity, parent bigint, foreign key(parent) references(id))");
// stat.execute("insert into test values(0, 0), (1, NULL), (2, 1), (3, 3), (4, 3)");
// stat.execute("drop table test");
stat.execute("create table parent(id int, x int)");
stat.execute("insert into parent values(1, 2)");
stat.execute("create table child(id int references parent(id)) as select 1");
conn.close();
}
private void testWriteDelay() throws Exception {
FileUtils.deleteRecursive(getBaseDir(), true);
Connection conn;
Statement stat;
ResultSet rs;
conn = getConnection("mvstore");
stat = conn.createStatement();
stat.execute("create table test(id int) " +
"engine \"org.h2.mvstore.db.MVTableEngine\"");
stat.execute("set write_delay 0");
stat.execute("insert into test values(1)");
stat.execute("shutdown immediately");
try {
conn.close();
} catch (Exception e) {
// ignore
}
conn = getConnection("mvstore");
stat = conn.createStatement();
rs = stat.executeQuery("select * from test");
assertTrue(rs.next());
conn.close();
}
private void testAutoCommit() throws SQLException {
FileUtils.deleteRecursive(getBaseDir(), true);
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论