提交 3fcb3c2a authored 作者: Thomas Mueller's avatar Thomas Mueller

MVStore: multiple issues were fixed

上级 667a4733
......@@ -23,6 +23,7 @@ Change Log
in a strange exception of the type "column x must be included in the group by list".
</li><li>Issue 454: Use Charset for type-safety.
</li><li>Queries with both LIMIT and OFFSET could throw an IllegalArgumentException.
</li><li>MVStore: multiple issues were fixed: 460, 461, 462.
</li><li>MVStore: larger stores (multiple GB) are now much faster.
</li><li>When using local temporary tables and not dropping them manually before closing the session,
and then killing the process could result in a database that couldn't be opened (except when using
......@@ -38,7 +39,7 @@ Change Log
</li><li>Issue 274: Sybase/MSSQLServer compatibility - Add GETDATE and CHARINDEX system functions
</li><li>Issue 274: Sybase/MSSQLServer compatibility - swap parameters of CONVERT function.
</li><li>Issue 274: Sybase/MSSQLServer compatibility - support index clause e.g. "select * from test (index table1_index)"
</li><li>Fix bug in optimising SELECT * FROM A WHERE X=1 OR X=2 OR X=3 into SELECT * FROM A WHERE X IN (1,2,3)
</li><li>Fix bug in optimizing SELECT * FROM A WHERE X=1 OR X=2 OR X=3 into SELECT * FROM A WHERE X IN (1,2,3)
</li><li>Issue 442: groovy patch for SourceCompiler (function ALIAS)
</li><li>Issue 459: Improve LOB documentation
</li></ul>
......
......@@ -143,6 +143,7 @@ public abstract class Command implements CommandInterface {
}
private void stop() {
session.endStatement();
session.closeTemporaryResults();
session.setCurrentCommand(null);
if (!isTransactional()) {
......@@ -226,7 +227,7 @@ public abstract class Command implements CommandInterface {
}
}
synchronized (sync) {
int rollback = session.getUndoLogPos();
Session.Savepoint rollback = session.setSavepoint();
session.setCurrentCommand(this);
try {
while (true) {
......
......@@ -21,7 +21,7 @@ import org.h2.engine.Database;
import org.h2.engine.Session;
import org.h2.expression.Expression;
import org.h2.message.DbException;
import org.h2.mvstore.db.MVTableEngine;
import org.h2.mvstore.db.MVTableEngine.Store;
import org.h2.result.ResultInterface;
import org.h2.store.FileLister;
import org.h2.store.PageStore;
......@@ -58,7 +58,10 @@ public class BackupCommand extends Prepared {
throw DbException.get(ErrorCode.DATABASE_IS_NOT_PERSISTENT);
}
try {
MVTableEngine.flush(db);
Store store = db.getMvStore();
if (store != null) {
store.store();
}
String name = db.getName();
name = FileUtils.getName(name);
OutputStream zip = FileUtils.newOutputStream(fileName, false);
......
......@@ -30,6 +30,7 @@ import org.h2.jdbc.JdbcConnection;
import org.h2.message.DbException;
import org.h2.message.Trace;
import org.h2.message.TraceSystem;
import org.h2.mvstore.db.MVTableEngine;
import org.h2.result.Row;
import org.h2.result.SearchRow;
import org.h2.schema.Schema;
......@@ -173,6 +174,7 @@ public class Database implements DataHandler {
private final DbSettings dbSettings;
private final int reconnectCheckDelay;
private int logMode;
private MVTableEngine.Store mvStore;
public Database(ConnectionInfo ci, String cipher) {
String name = ci.getName();
......@@ -262,6 +264,14 @@ public class Database implements DataHandler {
}
powerOffCount = count;
}
public MVTableEngine.Store getMvStore() {
return mvStore;
}
public void setMvStore(MVTableEngine.Store mvStore) {
this.mvStore = mvStore;
}
/**
* Check if two values are equal with the current comparison mode.
......@@ -406,6 +416,9 @@ public class Database implements DataHandler {
try {
powerOffCount = -1;
stopWriter();
if (mvStore != null) {
mvStore.closeImmediately();
}
if (pageStore != null) {
try {
pageStore.close();
......@@ -1218,6 +1231,9 @@ public class Database implements DataHandler {
}
}
reconnectModified(false);
if (mvStore != null) {
mvStore.close();
}
closeFiles();
if (persistent && lock == null && fileLockMethod != FileLock.LOCK_NO && fileLockMethod != FileLock.LOCK_FS) {
// everything already closed (maybe in checkPowerOff)
......@@ -1251,6 +1267,9 @@ public class Database implements DataHandler {
private synchronized void closeFiles() {
try {
if (mvStore != null) {
mvStore.closeImmediately();
}
if (pageStore != null) {
pageStore.close();
pageStore = null;
......
......@@ -10,6 +10,8 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Random;
import java.util.Set;
import org.h2.command.Command;
import org.h2.command.CommandInterface;
import org.h2.command.Parser;
......@@ -23,7 +25,7 @@ import org.h2.jdbc.JdbcConnection;
import org.h2.message.DbException;
import org.h2.message.Trace;
import org.h2.message.TraceSystem;
import org.h2.mvstore.db.TransactionStore;
import org.h2.mvstore.db.MVTable;
import org.h2.mvstore.db.TransactionStore.Transaction;
import org.h2.result.ResultInterface;
import org.h2.result.Row;
......@@ -72,7 +74,7 @@ public class Session extends SessionWithState {
private Value lastScopeIdentity = ValueLong.get(0);
private int firstUncommittedLog = Session.LOG_WRITTEN;
private int firstUncommittedPos = Session.LOG_WRITTEN;
private HashMap<String, Integer> savepoints;
private HashMap<String, Savepoint> savepoints;
private HashMap<String, Table> localTempTables;
private HashMap<String, Index> localTempTableIndexes;
private HashMap<String, Constraint> localTempTableConstraints;
......@@ -104,6 +106,7 @@ public class Session extends SessionWithState {
private int objectId;
private final int queryCacheSize;
private SmallLRUCache<String, Command> queryCache;
private Transaction transaction;
private long startStatement = -1;
......@@ -514,13 +517,19 @@ public class Session extends SessionWithState {
public void rollback() {
checkCommitRollback();
if (transaction != null) {
Set<String> changed = transaction.getChangedMaps(0);
for (MVTable t : database.getMvStore().getTables()) {
if (changed.contains(t.getMapName())) {
t.setModified();
}
}
transaction.rollback();
transaction = null;
}
currentTransactionName = null;
boolean needCommit = false;
if (undoLog.size() > 0) {
rollbackTo(0, false);
rollbackTo(null, false);
needCommit = true;
}
if (locks.size() > 0 || needCommit) {
......@@ -537,30 +546,49 @@ public class Session extends SessionWithState {
/**
* Partially roll back the current transaction.
*
* @param index the position to which should be rolled back
* @param savepoint the savepoint to which should be rolled back
* @param trimToSize if the list should be trimmed
*/
public void rollbackTo(int index, boolean trimToSize) {
public void rollbackTo(Savepoint savepoint, boolean trimToSize) {
int index = savepoint == null ? 0 : savepoint.logIndex;
while (undoLog.size() > index) {
UndoLogRecord entry = undoLog.getLast();
entry.undo(this);
undoLog.removeLast(trimToSize);
}
if (transaction != null) {
Set<String> changed = transaction.getChangedMaps(savepoint.transactionSavepoint);
for (MVTable t : database.getMvStore().getTables()) {
if (changed.contains(t.getMapName())) {
t.setModified();
}
}
transaction.rollbackToSavepoint(savepoint.transactionSavepoint);
}
if (savepoints != null) {
String[] names = new String[savepoints.size()];
savepoints.keySet().toArray(names);
for (String name : names) {
Integer savepointIndex = savepoints.get(name);
if (savepointIndex.intValue() > index) {
Savepoint sp = savepoints.get(name);
int savepointIndex = sp.logIndex;
if (savepointIndex > index) {
savepoints.remove(name);
}
}
}
}
@Override
public int getUndoLogPos() {
return undoLog.size();
public boolean hasPendingTransaction() {
return undoLog.size() > 0;
}
public Savepoint setSavepoint() {
Savepoint sp = new Savepoint();
sp.logIndex = undoLog.size();
if (database.getMvStore() != null) {
sp.transactionSavepoint = getStatementSavepoint();
}
return sp;
}
public int getId() {
......@@ -791,7 +819,12 @@ public class Session extends SessionWithState {
if (savepoints == null) {
savepoints = database.newStringMap();
}
savepoints.put(name, getUndoLogPos());
Savepoint sp = new Savepoint();
sp.logIndex = undoLog.size();
if (database.getMvStore() != null) {
sp.transactionSavepoint = getStatementSavepoint();
}
savepoints.put(name, sp);
}
/**
......@@ -804,12 +837,11 @@ public class Session extends SessionWithState {
if (savepoints == null) {
throw DbException.get(ErrorCode.SAVEPOINT_IS_INVALID_1, name);
}
Integer savepointIndex = savepoints.get(name);
if (savepointIndex == null) {
Savepoint savepoint = savepoints.get(name);
if (savepoint == null) {
throw DbException.get(ErrorCode.SAVEPOINT_IS_INVALID_1, name);
}
int i = savepointIndex.intValue();
rollbackTo(i, false);
rollbackTo(savepoint, false);
}
/**
......@@ -1274,12 +1306,11 @@ public class Session extends SessionWithState {
/**
* Get the transaction to use for this session.
*
* @param store the store
* @return the transaction
*/
public Transaction getTransaction(TransactionStore store) {
public Transaction getTransaction() {
if (transaction == null) {
transaction = store.begin();
transaction = database.getMvStore().getTransactionStore().begin();
startStatement = -1;
}
return transaction;
......@@ -1287,9 +1318,22 @@ public class Session extends SessionWithState {
public long getStatementSavepoint() {
if (startStatement == -1) {
startStatement = transaction.setSavepoint();
startStatement = getTransaction().setSavepoint();
}
return startStatement;
}
public void endStatement() {
startStatement = -1;
}
/**
* Represents a savepoint (a position in a transaction to where one can roll
* back to).
*/
public static class Savepoint {
int logIndex;
long transactionSavepoint;
}
}
......@@ -67,13 +67,13 @@ public interface SessionInterface extends Closeable {
* @return the data handler
*/
DataHandler getDataHandler();
/**
* Get the undo log position.
*
* @return the position (0 means no pending transaction)
* Check whether this session has a pending transaction.
*
* @return true if it has
*/
int getUndoLogPos();
boolean hasPendingTransaction();
/**
* Cancel the current or next command (called when closing a connection).
......
......@@ -56,7 +56,7 @@ public class SessionRemote extends SessionWithState implements DataHandler {
public static final int SESSION_CANCEL_STATEMENT = 13;
public static final int SESSION_CHECK_KEY = 14;
public static final int SESSION_SET_AUTOCOMMIT = 15;
public static final int SESSION_UNDO_LOG_POS = 16;
public static final int SESSION_HAS_PENDING_TRANSACTION = 16;
public static final int LOB_READ = 17;
public static final int STATUS_ERROR = 0;
......@@ -123,23 +123,22 @@ public class SessionRemote extends SessionWithState implements DataHandler {
return trans;
}
@Override
public int getUndoLogPos() {
public boolean hasPendingTransaction() {
if (clientVersion < Constants.TCP_PROTOCOL_VERSION_10) {
return 1;
return true;
}
for (int i = 0, count = 0; i < transferList.size(); i++) {
Transfer transfer = transferList.get(i);
try {
traceOperation("SESSION_UNDO_LOG_POS", 0);
transfer.writeInt(SessionRemote.SESSION_UNDO_LOG_POS);
traceOperation("SESSION_HAS_PENDING_TRANSACTION", 0);
transfer.writeInt(SessionRemote.SESSION_HAS_PENDING_TRANSACTION);
done(transfer);
return transfer.readInt();
return transfer.readInt() != 0;
} catch (IOException e) {
removeServer(e, i--, ++count);
}
}
return 1;
return true;
}
@Override
......
......@@ -346,7 +346,7 @@ public class JdbcConnection extends TraceObject implements Connection {
try {
if (!session.isClosed()) {
try {
if (session.getUndoLogPos() != 0) {
if (session.hasPendingTransaction()) {
// roll back unless that would require to re-connect
// (the transaction can't be rolled back after re-connecting)
if (!session.isReconnectNeeded(true)) {
......
......@@ -64,6 +64,7 @@ public class Cursor<K> implements Iterator<K> {
}
long index = map.getKeyIndex(current);
K k = map.getKey(index + n);
pos = null;
min(root, k);
fetchNext();
}
......@@ -76,7 +77,7 @@ public class Cursor<K> implements Iterator<K> {
/**
* Fetch the next entry that is equal or larger than the given key, starting
* from the given page.
* from the given page. This method retains the stack.
*
* @param p the page to start
* @param from the key to search
......
......@@ -328,7 +328,7 @@ public class MVMap<K, V> extends AbstractMap<K, V>
* Get the smallest key that is larger than the given key, or null if no
* such key exists.
*
* @param key the key (may not be null)
* @param key the key
* @return the result
*/
public K higherKey(K key) {
......@@ -338,7 +338,7 @@ public class MVMap<K, V> extends AbstractMap<K, V>
/**
* Get the smallest key that is larger or equal to this key.
*
* @param key the key (may not be null)
* @param key the key
* @return the result
*/
public K ceilingKey(K key) {
......@@ -348,7 +348,7 @@ public class MVMap<K, V> extends AbstractMap<K, V>
/**
* Get the largest key that is smaller or equal to this key.
*
* @param key the key (may not be null)
* @param key the key
* @return the result
*/
public K floorKey(K key) {
......@@ -359,7 +359,7 @@ public class MVMap<K, V> extends AbstractMap<K, V>
* Get the largest key that is smaller than the given key, or null if no
* such key exists.
*
* @param key the key (may not be null)
* @param key the key
* @return the result
*/
public K lowerKey(K key) {
......@@ -372,22 +372,16 @@ public class MVMap<K, V> extends AbstractMap<K, V>
* @param key the key
* @param min whether to retrieve the smallest key
* @param excluding if the given upper/lower bound is exclusive
* @return the key, or null if the map is empty
* @return the key, or null if no such key exists
*/
protected K getMinMax(K key, boolean min, boolean excluding) {
checkOpen();
if (size() == 0) {
return null;
}
return getMinMax(root, key, min, excluding);
}
@SuppressWarnings("unchecked")
private K getMinMax(Page p, K key, boolean min, boolean excluding) {
if (p.isLeaf()) {
if (key == null) {
return (K) p.getKey(min ? 0 : p.getKeyCount() - 1);
}
int x = p.binarySearch(key);
if (x < 0) {
x = -x - (min ? 2 : 1);
......@@ -399,16 +393,11 @@ public class MVMap<K, V> extends AbstractMap<K, V>
}
return (K) p.getKey(x);
}
int x;
if (key == null) {
x = min ? 0 : p.getKeyCount() - 1;
int x = p.binarySearch(key);
if (x < 0) {
x = -x - 1;
} else {
x = p.binarySearch(key);
if (x < 0) {
x = -x - 1;
} else {
x++;
}
x++;
}
while (true) {
if (x < 0 || x >= p.getChildPageCount()) {
......
......@@ -44,6 +44,12 @@ H:3,...
TODO:
TestMVStoreDataLoss
TransactionStore:
- support reading the undo log
MVStore:
- rolling docs review: at convert "Features" to top-level (linked) entries
- additional test async write / read algorithm for speed and errors
- move setters to the builder, except for setRetainVersion, setReuseSpace,
......@@ -94,10 +100,12 @@ TODO:
- to save space when persisting very small transactions,
-- use a transaction log where only the deltas are stored
- serialization for lists, sets, sets, sorted sets, maps, sorted maps
- maybe rename 'rollback' to 'revert'
- maybe rename 'rollback' to 'revert' to distinguish from transactions
- support other compression algorithms (deflate, LZ4,...)
- only retain the last version, unless explicitly set (setRetainVersion)
- unit test for the FreeSpaceList; maybe find a simpler implementation
- support opening (existing) maps by id
- more consistent null handling (keys/values sometimes may be null)
*/
......@@ -751,18 +759,18 @@ public class MVStore {
* Commit the changes. This method marks the changes as committed and
* increments the version.
* <p>
* Unless the write delay is disabled, this method does not write to the
* Unless the write delay is set to 0, this method does not write to the
* file. Instead, data is written after the delay, manually by calling the
* store method, when the write buffer is full, or when closing the store.
*
* @return the new version
*/
public long commit() {
if (writeDelay == 0) {
return store(true);
}
long v = ++currentVersion;
lastCommittedVersion = v;
if (writeDelay == 0) {
store(false);
}
return v;
}
......
......@@ -268,6 +268,10 @@ public class MVPrimaryIndex extends BaseIndex {
// TODO estimate disk space usage
return 0;
}
public String getMapName() {
return mapName;
}
@Override
public void checkRename() {
......
......@@ -47,8 +47,6 @@ import org.h2.value.Value;
*/
public class MVTable extends TableBase {
private final String storeName;
private final TransactionStore store;
private MVPrimaryIndex primaryIndex;
private ArrayList<Index> indexes = New.arrayList();
private long lastModificationId;
......@@ -60,6 +58,8 @@ public class MVTable extends TableBase {
private int nextAnalyze;
private boolean containsLargeObject;
private Column rowIdColumn;
private final TransactionStore store;
/**
* True if one thread ever was waiting to lock this table. This is to avoid
......@@ -68,11 +68,10 @@ public class MVTable extends TableBase {
*/
private boolean waitForLock;
public MVTable(CreateTableData data, String storeName, TransactionStore store) {
public MVTable(CreateTableData data, MVTableEngine.Store store) {
super(data);
nextAnalyze = database.getSettings().analyzeAuto;
this.storeName = storeName;
this.store = store;
this.store = store.getTransactionStore();
this.isHidden = data.isHidden;
for (Column col : getColumns()) {
if (DataType.isLargeObject(col.getType())) {
......@@ -96,6 +95,10 @@ public class MVTable extends TableBase {
rowCount = primaryIndex.getRowCount(session);
indexes.add(primaryIndex);
}
public String getMapName() {
return primaryIndex.getMapName();
}
@Override
public void lock(Session session, boolean exclusive, boolean force) {
......@@ -354,7 +357,7 @@ public class MVTable extends TableBase {
@Override
public void close(Session session) {
MVTableEngine.closeTable(storeName, this);
// ignore
}
/**
......@@ -675,7 +678,7 @@ public class MVTable extends TableBase {
// TODO need to commit/rollback the transaction
return store.begin();
}
return session.getTransaction(store);
return session.getTransaction();
}
@Override
......
......@@ -6,13 +6,13 @@
*/
package org.h2.mvstore.db;
import java.io.IOException;
import java.nio.channels.FileChannel;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Map;
import java.util.WeakHashMap;
import java.util.List;
import org.h2.api.TableEngine;
import org.h2.command.ddl.CreateTableData;
import org.h2.constant.ErrorCode;
import org.h2.engine.Constants;
import org.h2.engine.Database;
import org.h2.message.DbException;
......@@ -25,102 +25,38 @@ import org.h2.util.New;
*/
public class MVTableEngine implements TableEngine {
static final Map<String, Store> STORES = new WeakHashMap<String, Store>();
/**
* Flush all changes.
*
* @param db the database
*/
public static void flush(Database db) {
String storeName = db.getDatabasePath();
if (storeName == null) {
return;
}
synchronized (STORES) {
Store store = STORES.get(storeName);
if (store == null) {
return;
}
store(store.getStore());
}
}
public static Collection<Store> getStores() {
return STORES.values();
}
@Override
public TableBase createTable(CreateTableData data) {
Database db = data.session.getDatabase();
byte[] key = db.getFilePasswordHash();
String storeName = db.getDatabasePath();
MVStore.Builder builder = new MVStore.Builder();
Store store;
if (storeName == null) {
store = new Store(db, builder.open());
} else {
synchronized (STORES) {
store = STORES.get(storeName);
if (store == null) {
builder.fileName(storeName + Constants.SUFFIX_MV_FILE);
if (db.isReadOnly()) {
builder.readOnly();
}
if (key != null) {
char[] password = new char[key.length];
for (int i = 0; i < key.length; i++) {
password[i] = (char) key[i];
}
builder.encryptionKey(password);
Store store = db.getMvStore();
if (store == null) {
byte[] key = db.getFilePasswordHash();
String dbPath = db.getDatabasePath();
MVStore.Builder builder = new MVStore.Builder();
if (dbPath == null) {
store = new Store(db, builder.open());
} else {
builder.fileName(dbPath + Constants.SUFFIX_MV_FILE);
if (db.isReadOnly()) {
builder.readOnly();
}
if (key != null) {
char[] password = new char[key.length];
for (int i = 0; i < key.length; i++) {
password[i] = (char) key[i];
}
store = new Store(db, builder.open());
STORES.put(storeName, store);
} else if (store.db != db) {
throw DbException.get(ErrorCode.DATABASE_ALREADY_OPEN_1, storeName);
builder.encryptionKey(password);
}
store = new Store(db, builder.open());
}
db.setMvStore(store);
}
MVTable table = new MVTable(data, storeName, store.getTransactionStore());
MVTable table = new MVTable(data, store);
store.openTables.add(table);
table.init(data.session);
return table;
}
/**
* Close the table, and close the store if there are no remaining open
* tables.
*
* @param storeName the store name
* @param table the table
*/
static void closeTable(String storeName, MVTable table) {
synchronized (STORES) {
Store store = STORES.get(storeName);
if (store != null) {
store.openTables.remove(table);
if (store.openTables.size() == 0) {
store(store.getStore());
store.getStore().close();
STORES.remove(storeName);
}
}
}
}
/**
* Store the data if needed.
*
* @param store the store
*/
static void store(MVStore store) {
if (!store.isReadOnly()) {
store.commit();
store.compact(50);
store.store();
}
}
/**
* A store with open tables.
*/
......@@ -152,7 +88,7 @@ public class MVTableEngine implements TableEngine {
this.transactionStore = new TransactionStore(store,
new ValueDataType(null, null, null));
}
public MVStore getStore() {
return store;
}
......@@ -160,6 +96,39 @@ public class MVTableEngine implements TableEngine {
public TransactionStore getTransactionStore() {
return transactionStore;
}
public List<MVTable> getTables() {
return openTables;
}
public void store() {
if (!store.isReadOnly()) {
store.commit();
store.compact(50);
store.store();
}
}
public void closeImmediately() {
if (store.isClosed()) {
return;
}
FileChannel f = store.getFile();
if (f != null) {
try {
f.close();
} catch (IOException e) {
throw DbException.convertIOException(e, "Closing file");
}
}
}
public void close() {
if (!store.isReadOnly()) {
store.store();
}
store.close();
}
}
......
......@@ -8,9 +8,11 @@ package org.h2.mvstore.db;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.h2.mvstore.Cursor;
import org.h2.mvstore.DataUtils;
......@@ -28,7 +30,7 @@ public class TransactionStore {
private static final String LAST_TRANSACTION_ID = "lastTransactionId";
// TODO should not be hardcoded
// TODO should not be hard-coded
private static final int MAX_UNSAVED_PAGES = 4 * 1024;
/**
......@@ -245,22 +247,11 @@ public class TransactionStore {
}
endTransaction(t);
}
/**
* Roll a transaction back.
*
* @param t the transaction
* @param maxLogId the last log id
*/
void rollback(Transaction t, long maxLogId) {
rollbackTo(t, maxLogId, 0);
endTransaction(t);
}
boolean isTransactionOpen(long transactionId) {
// if (transactionId < firstOpenTransaction) {
// return false;
// }
if (transactionId < firstOpenTransaction) {
return false;
}
if (firstOpenTransaction == -1) {
if (undoLog.size() == 0) {
return false;
......@@ -276,7 +267,7 @@ public class TransactionStore {
return key != null && key[0] == transactionId;
}
private void endTransaction(Transaction t) {
void endTransaction(Transaction t) {
if (t.getStatus() == Transaction.STATUS_PREPARED) {
preparedTransactions.remove(t.getId());
}
......@@ -299,6 +290,7 @@ public class TransactionStore {
Object[] op = undoLog.get(new long[] {
t.getId(), logId });
int mapId = ((Integer) op[1]).intValue();
// TODO open map by id if possible
Map<String, String> meta = store.getMetaMap();
String m = meta.get("map." + mapId);
String mapName = DataUtils.parseMap(m).get("name");
......@@ -315,6 +307,21 @@ public class TransactionStore {
undoLog.remove(op);
}
}
HashSet<String> getChangedMaps(Transaction t, long maxLogId, long toLogId) {
HashSet<String> set = New.hashSet();
for (long logId = maxLogId - 1; logId >= toLogId; logId--) {
Object[] op = undoLog.get(new long[] {
t.getId(), logId });
int mapId = ((Integer) op[1]).intValue();
// TODO open map by id if possible
Map<String, String> meta = store.getMetaMap();
String m = meta.get("map." + mapId);
String mapName = DataUtils.parseMap(m).get("name");
set.add(mapName);
}
return set;
}
/**
* A transaction.
......@@ -451,18 +458,6 @@ public class TransactionStore {
return new TransactionMap<K, V>(this, name, keyType, valueType);
}
/**
* Roll back to the given savepoint. This is only allowed if the
* transaction is open.
*
* @param savepointId the savepoint id
*/
public void rollbackToSavepoint(long savepointId) {
checkOpen();
store.rollbackTo(this, this.logId, savepointId);
this.logId = savepointId;
}
/**
* Prepare the transaction. Afterwards, the transaction can only be
* committed or rolled back.
......@@ -477,18 +472,41 @@ public class TransactionStore {
* Commit the transaction. Afterwards, this transaction is closed.
*/
public void commit() {
if (status != STATUS_CLOSED) {
store.commit(this, logId);
}
checkNotClosed();
store.commit(this, logId);
}
/**
* Roll back to the given savepoint. This is only allowed if the
* transaction is open.
*
* @param savepointId the savepoint id
*/
public void rollbackToSavepoint(long savepointId) {
checkOpen();
store.rollbackTo(this, logId, savepointId);
logId = savepointId;
}
/**
* Roll the transaction back. Afterwards, this transaction is closed.
*/
public void rollback() {
if (status != STATUS_CLOSED) {
store.rollback(this, logId);
}
checkNotClosed();
store.rollbackTo(this, logId, 0);
store.endTransaction(this);
}
/**
* Get the set of changed maps starting at the given savepoint up to
* now.
*
* @param savepointId the savepoint id, 0 meaning the beginning of the
* transaction
* @return the set of changed maps
*/
public Set<String> getChangedMaps(long savepointId) {
return store.getChangedMaps(this, logId, savepointId);
}
/**
......@@ -499,9 +517,14 @@ public class TransactionStore {
throw DataUtils.newIllegalStateException("Transaction is closed");
}
}
public long getCurrentVersion() {
return store.store.getCurrentVersion();
/**
* Check whether this transaction is open or prepared.
*/
void checkNotClosed() {
if (status == STATUS_CLOSED) {
throw DataUtils.newIllegalStateException("Transaction is closed");
}
}
}
......
......@@ -618,12 +618,10 @@ public class ValueDataType implements DataType {
}
if (smallLen == -2) {
String filename = readString(buff);
ValueLob lob = ValueLob.openUnlinked(type, handler, tableId, objectId, precision, compression, filename);
return lob;
} else {
ValueLob lob = ValueLob.openLinked(type, handler, tableId, objectId, precision, compression);
return lob;
return ValueLob.openUnlinked(type, handler, tableId, objectId, precision, compression, filename);
}
ValueLob lob = ValueLob.openLinked(type, handler, tableId, objectId, precision, compression);
return lob;
}
}
case Value.ARRAY: {
......
......@@ -398,9 +398,9 @@ public class TcpServerThread implements Runnable {
transfer.writeInt(SessionRemote.STATUS_OK).flush();
break;
}
case SessionRemote.SESSION_UNDO_LOG_POS: {
case SessionRemote.SESSION_HAS_PENDING_TRANSACTION: {
transfer.writeInt(SessionRemote.STATUS_OK).
writeInt(session.getUndoLogPos()).flush();
writeInt(session.hasPendingTransaction() ? 1 : 0).flush();
break;
}
case SessionRemote.LOB_READ: {
......
......@@ -808,12 +808,9 @@ public class Data {
}
if (smallLen == -2) {
String filename = readString();
ValueLob lob = ValueLob.openUnlinked(type, handler, tableId, objectId, precision, compression, filename);
return lob;
} else {
ValueLob lob = ValueLob.openLinked(type, handler, tableId, objectId, precision, compression);
return lob;
return ValueLob.openUnlinked(type, handler, tableId, objectId, precision, compression, filename);
}
return ValueLob.openLinked(type, handler, tableId, objectId, precision, compression);
}
}
case Value.ARRAY: {
......
......@@ -34,7 +34,7 @@ public class LobStorageFrontend implements LobStorageInterface {
*/
@Override
public void removeLob(long lob) {
// TODO ideally, this should not be called at all, but that's a refactoring for another day
// TODO this should not be called at all, but that's a refactoring for another day
}
/**
......@@ -64,7 +64,7 @@ public class LobStorageFrontend implements LobStorageInterface {
*/
@Override
public ValueLobDb copyLob(int type, long oldLobId, int tableId, long length) {
// TODO ideally, this should not be called at all, but that's a refactoring for another day
// TODO this should not be called at all, but that's a refactoring for another day
// this should never be called
throw new UnsupportedOperationException();
}
......
......@@ -426,7 +426,7 @@ public abstract class Table extends SchemaObjectBase {
*/
public void updateRows(Prepared prepared, Session session, RowList rows) {
// in case we need to undo the update
int rollback = session.getUndoLogPos();
Session.Savepoint rollback = session.setSavepoint();
// remove the old rows
int rowScanCount = 0;
for (rows.reset(); rows.hasNext();) {
......
......@@ -274,9 +274,9 @@ public class SourceCompiler {
static {
Object tmpLoader = null;
Throwable tmpInitfailException = null;
Throwable tmpInitFailException = null;
try {
// create an instance of ImportCustomiser
// create an instance of ImportCustomizer
Class<?> importCustomizerClass = Class.forName("org.codehaus.groovy.control.customizers.ImportCustomizer");
Object importCustomizer = Utils.newInstance("org.codehaus.groovy.control.customizers.ImportCustomizer");
// Call the method ImportCustomizer#addImports(String[])
......@@ -293,10 +293,10 @@ public class SourceCompiler {
ClassLoader parent = GroovyCompiler.class.getClassLoader();
tmpLoader = Utils.newInstance("groovy.lang.GroovyClassLoader", parent, configuration);
} catch (Exception ex) {
tmpInitfailException = ex;
tmpInitFailException = ex;
}
LOADER = tmpLoader;
INIT_FAIL_EXCEPTION = tmpInitfailException;
INIT_FAIL_EXCEPTION = tmpInitFailException;
}
public static Class<?> parseClass(String source, String packageAndClassName) {
......
......@@ -12,6 +12,7 @@ import java.util.Iterator;
import java.util.Map;
import java.util.Random;
import java.util.TreeMap;
import org.h2.mvstore.Cursor;
import org.h2.mvstore.MVMap;
import org.h2.mvstore.MVStore;
......@@ -122,7 +123,7 @@ public class TestMVStore extends TestBase {
}
private void testWriteBuffer() throws IOException {
String fileName = getBaseDir() + "/testAutoStoreBuffer.h3";
String fileName = getBaseDir() + "/testWriteBuffer.h3";
FileUtils.delete(fileName);
MVStore s;
MVMap<Integer, byte[]> m;
......@@ -167,10 +168,24 @@ public class TestMVStore extends TestBase {
}
private void testWriteDelay() throws InterruptedException {
String fileName = getBaseDir() + "/testUndoTempStore.h3";
FileUtils.delete(fileName);
String fileName = getBaseDir() + "/testWriteDelay.h3";
MVStore s;
MVMap<Integer, String> m;
FileUtils.delete(fileName);
s = new MVStore.Builder().writeDelay(0).
fileName(fileName).open();
m = s.openMap("data");
m.put(1, "1");
s.commit();
s.close();
s = new MVStore.Builder().writeDelay(0).
fileName(fileName).open();
m = s.openMap("data");
assertEquals(1, m.size());
s.close();
FileUtils.delete(fileName);
s = new MVStore.Builder().
writeDelay(1).
fileName(fileName).
......@@ -468,6 +483,16 @@ public class TestMVStore extends TestBase {
for (int i = 0; i < 100; i += 2) {
map.put(i, 10 * i);
}
Cursor<Integer> c = map.keyIterator(50);
// skip must reset the root of the cursor
c.skip(10);
for (int i = 70; i < 100; i += 2) {
assertTrue(c.hasNext());
assertEquals(i, c.next().intValue());
}
assertFalse(c.hasNext());
for (int i = -1; i < 100; i++) {
long index = map.getKeyIndex(i);
if (i < 0 || (i % 2) != 0) {
......@@ -485,7 +510,7 @@ public class TestMVStore extends TestBase {
}
}
// skip
Cursor<Integer> c = map.keyIterator(0);
c = map.keyIterator(0);
assertTrue(c.hasNext());
assertEquals(0, c.next().intValue());
c.skip(0);
......@@ -513,14 +538,22 @@ public class TestMVStore extends TestBase {
MVMap<Integer, Integer> map = s.openMap("test");
map.put(10, 100);
map.put(20, 200);
assertEquals(10, map.firstKey().intValue());
assertEquals(20, map.lastKey().intValue());
assertEquals(20, map.ceilingKey(15).intValue());
assertEquals(20, map.ceilingKey(20).intValue());
assertEquals(10, map.floorKey(15).intValue());
assertEquals(10, map.floorKey(10).intValue());
assertEquals(20, map.higherKey(10).intValue());
assertEquals(10, map.lowerKey(20).intValue());
final MVMap<Integer, Integer> m = map;
assertEquals(10, m.ceilingKey(null).intValue());
assertEquals(10, m.higherKey(null).intValue());
assertNull(m.lowerKey(null));
assertNull(m.floorKey(null));
for (int i = 3; i < 20; i++) {
s = openStore(null);
......
......@@ -14,11 +14,13 @@ import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Savepoint;
import java.sql.Statement;
import org.h2.constant.ErrorCode;
import org.h2.engine.Constants;
import org.h2.mvstore.db.MVTableEngine;
import org.h2.engine.Database;
import org.h2.jdbc.JdbcConnection;
import org.h2.store.fs.FileUtils;
import org.h2.test.TestBase;
import org.h2.util.Task;
......@@ -40,6 +42,7 @@ public class TestMVTableEngine extends TestBase {
@Override
public void test() throws Exception {
// testSpeed();
testAutoCommit();
testReopen();
testBlob();
testExclusiveLock();
......@@ -58,7 +61,7 @@ public class TestMVTableEngine extends TestBase {
dbName += ";LOCK_MODE=0";
// dbName += ";LOG=0";
testSpeed(dbName);
int tes;
int test;
//Profiler prof = new Profiler().startCollecting();
dbName = "mvstore" +
";DEFAULT_TABLE_ENGINE=org.h2.mvstore.db.MVTableEngine";
......@@ -119,6 +122,37 @@ int tes;
//System.out.println(prof.getTop(10));
System.out.println((System.currentTimeMillis() - time) + " " + dbName + " after");
}
private void testAutoCommit() throws SQLException {
FileUtils.deleteRecursive(getBaseDir(), true);
Connection conn;
Statement stat;
ResultSet rs;
conn = getConnection("mvstore");
stat = conn.createStatement();
stat.execute("create table test(id int primary key, name varchar) "
+ "engine \"org.h2.mvstore.db.MVTableEngine\"");
conn.setAutoCommit(false);
stat.execute("insert into test values(1, 'Hello')");
stat.execute("insert into test values(2, 'World')");
rs = stat.executeQuery("select count(*) from test");
rs.next();
assertEquals(2, rs.getInt(1));
conn.rollback();
rs = stat.executeQuery("select count(*) from test");
rs.next();
assertEquals(0, rs.getInt(1));
stat.execute("insert into test values(1, 'Hello')");
Savepoint sp = conn.setSavepoint();
stat.execute("insert into test values(2, 'World')");
conn.rollback(sp);
rs = stat.executeQuery("select count(*) from test");
rs.next();
assertEquals(1, rs.getInt(1));
conn.close();
}
private void testReopen() throws SQLException {
FileUtils.deleteRecursive(getBaseDir(), true);
......@@ -220,9 +254,8 @@ int tes;
conn.close();
FileUtils.setReadOnly(getBaseDir() + "/mvstore.h2.db");
conn = getConnection(dbName);
for (MVTableEngine.Store s : MVTableEngine.getStores()) {
assertTrue(s.getStore().isReadOnly());
}
Database db = (Database) ((JdbcConnection) conn).getSession().getDataHandler();
assertTrue(db.getMvStore().getStore().isReadOnly());
conn.close();
FileUtils.deleteRecursive(getBaseDir(), true);
}
......@@ -236,9 +269,8 @@ int tes;
long maxSize = 0;
for (int i = 0; i < 20; i++) {
conn = getConnection(dbName);
for (MVTableEngine.Store s : MVTableEngine.getStores()) {
s.getStore().setRetentionTime(0);
}
Database db = (Database) ((JdbcConnection) conn).getSession().getDataHandler();
db.getMvStore().getStore().setRetentionTime(0);
stat = conn.createStatement();
stat.execute("create table test(id int primary key, data varchar)");
stat.execute("insert into test select x, space(1000) from system_range(1, 1000)");
......
......@@ -15,6 +15,7 @@ import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Random;
import java.util.TreeSet;
import org.h2.mvstore.MVStore;
import org.h2.mvstore.db.TransactionStore;
......@@ -41,6 +42,7 @@ public class TestTransactionStore extends TestBase {
@Override
public void test() throws Exception {
FileUtils.createDirectories(getBaseDir());
testGetModifiedMaps();
testKeyIterator();
testMultiStatement();
testTwoPhaseCommit();
......@@ -49,6 +51,42 @@ public class TestTransactionStore extends TestBase {
testSingleConnection();
testCompareWithPostgreSQL();
}
private void testGetModifiedMaps() {
MVStore s = MVStore.open(null);
TransactionStore ts = new TransactionStore(s);
Transaction tx;
TransactionMap<String, String> m1, m2, m3;
long sp;
TreeSet<String> changed;
tx = ts.begin();
m1 = tx.openMap("m1");
m2 = tx.openMap("m2");
m3 = tx.openMap("m3");
changed = new TreeSet<String>(tx.getChangedMaps(0));
assertEquals(0, changed.size());
tx.commit();
tx = ts.begin();
m1 = tx.openMap("m1");
m2 = tx.openMap("m2");
m3 = tx.openMap("m3");
m1.put("1", "100");
sp = tx.setSavepoint();
m2.put("1", "100");
m3.put("1", "100");
changed = new TreeSet<String>(tx.getChangedMaps(sp));
assertEquals("[m2, m3]", changed.toString());
changed = new TreeSet<String>(tx.getChangedMaps(0));
assertEquals("[m1, m2, m3]", changed.toString());
tx.rollbackToSavepoint(sp);
changed = new TreeSet<String>(tx.getChangedMaps(0));
assertEquals("[m1]", changed.toString());
tx.commit();
s.close();
}
private void testKeyIterator() {
MVStore s = MVStore.open(null);
......
......@@ -725,4 +725,5 @@ brown tweak pbkdf sharding ieee galois otterstrom sharded hruda argaul gaul
simo unpredictable overtakes conditionally decreases warned coupled spin
unsynchronized reality cores effort slice addleman koskela ville blocking seen
isam charindex removal getdate jesse fake covers covering cheaper adjacent spot
transition anthony goubard netherlands versioned orderable customizer cachable
customizers retains
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论