提交 929803e8 authored 作者: Thomas Mueller's avatar Thomas Mueller

MVStore: orphaned lob objects were not correctly removed in some cases, making…

MVStore: orphaned lob objects were not correctly removed in some cases, making the database grow unnecessarily.
上级 a468a417
......@@ -237,6 +237,45 @@ public class StreamStore {
}
}
/**
* Get the key of the biggest block, of -1 for inline data.
* This method is used to garbage collect orphaned blocks.
*
* @param id the id
* @return the key, or -1
*/
public long getMaxBlockKey(byte[] id) {
long maxKey = -1;
ByteBuffer idBuffer = ByteBuffer.wrap(id);
while (idBuffer.hasRemaining()) {
switch (idBuffer.get()) {
case 0:
// in-place: 0, len (int), data
int len = DataUtils.readVarInt(idBuffer);
idBuffer.position(idBuffer.position() + len);
break;
case 1:
// block: 1, len (int), blockId (long)
DataUtils.readVarInt(idBuffer);
long k = DataUtils.readVarLong(idBuffer);
maxKey = Math.max(maxKey, k);
break;
case 2:
// indirect: 2, total len (long), blockId (long)
DataUtils.readVarLong(idBuffer);
long k2 = DataUtils.readVarLong(idBuffer);
// recurse
byte[] r = map.get(k2);
maxKey = Math.max(maxKey, getMaxBlockKey(r));
break;
default:
throw DataUtils.newIllegalArgumentException(
"Unsupported id {0}", Arrays.toString(id));
}
}
return maxKey;
}
/**
* Remove all stored blocks for the given id.
*
......
......@@ -94,6 +94,35 @@ public class LobStorageMap implements LobStorageInterface {
refMap = mvStore.openMap("lobRef");
dataMap = mvStore.openMap("lobData");
streamStore = new StreamStore(dataMap);
// garbage collection of the last blocks
if (database.isReadOnly()) {
return;
}
if (dataMap.isEmpty()) {
return;
}
// search the last referenced block
// (a lob may not have any referenced blocks if data is kept inline,
// so we need to loop)
long lastUsedKey = -1;
Long lobId = lobMap.lastKey();
while (lobId != null) {
Object[] v = lobMap.get(lobId);
byte[] id = (byte[]) v[0];
lastUsedKey = streamStore.getMaxBlockKey(id);
if (lastUsedKey >= 0) {
break;
}
lobId = lobMap.floorKey(lobId);
}
// delete all blocks that are newer
while (true) {
Long last = dataMap.lastKey();
if (last == null || last <= lastUsedKey) {
break;
}
dataMap.remove(last);
}
}
@Override
......@@ -256,6 +285,9 @@ public class LobStorageMap implements LobStorageInterface {
@Override
public void removeAllForTable(int tableId) {
init();
if (database.getMvStore().getStore().isClosed()) {
return;
}
// this might not be very efficient -
// to speed it up, we would need yet another map
ArrayList<Long> list = New.arrayList();
......@@ -269,6 +301,10 @@ public class LobStorageMap implements LobStorageInterface {
for (long lobId : list) {
removeLob(tableId, lobId);
}
if (tableId == LobStorageFrontend.TABLE_ID_SESSION_VARIABLE) {
removeAllForTable(LobStorageFrontend.TABLE_TEMP);
removeAllForTable(LobStorageFrontend.TABLE_RESULT);
}
}
@Override
......
......@@ -8,6 +8,7 @@ package org.h2.test.store;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.StringReader;
import java.math.BigDecimal;
import java.nio.channels.FileChannel;
import java.sql.Connection;
......@@ -17,6 +18,7 @@ import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Savepoint;
import java.sql.Statement;
import java.util.concurrent.atomic.AtomicBoolean;
import org.h2.api.ErrorCode;
import org.h2.engine.Constants;
......@@ -49,6 +51,8 @@ public class TestMVTableEngine extends TestBase {
@Override
public void test() throws Exception {
testShutdownDuringLobCreation();
testLobCreationThenShutdown();
testManyTransactions();
testAppendOnly();
testLowRetentionTime();
......@@ -82,9 +86,117 @@ public class TestMVTableEngine extends TestBase {
testLocking();
testSimple();
}
private void testShutdownDuringLobCreation() throws Exception {
deleteDb("testShutdownDuringLobCreation");
Connection conn = getConnection("testShutdownDuringLobCreation");
Statement stat = conn.createStatement();
stat.execute("create table test(data clob) as select space(10000)");
final PreparedStatement prep = conn
.prepareStatement("set @lob = ?");
final AtomicBoolean end = new AtomicBoolean();
Task t = new Task() {
@Override
public void call() throws Exception {
prep.setBinaryStream(1, new InputStream() {
int len;
@Override
public int read() throws IOException {
if (len++ < 1024 * 1024 * 4) {
return 0;
}
end.set(true);
while (!stop) {
try {
Thread.sleep(1);
} catch (InterruptedException e) {
// ignore
}
}
return -1;
}
} , -1);
}
};
t.execute();
while (!end.get()) {
Thread.sleep(1);
}
stat.execute("checkpoint");
stat.execute("shutdown immediately");
Exception ex = t.getException();
assertTrue(ex != null);
try {
conn.close();
} catch (Exception e) {
// ignore
}
conn = getConnection("testShutdownDuringLobCreation");
stat = conn.createStatement();
stat.execute("shutdown defrag");
try {
conn.close();
} catch (Exception e) {
// ignore
}
conn = getConnection("testShutdownDuringLobCreation");
stat = conn.createStatement();
ResultSet rs = stat.executeQuery("select * " +
"from information_schema.settings " +
"where name = 'info.PAGE_COUNT'");
rs.next();
int pages = rs.getInt(2);
// only one lob should remain (but it is small and compressed)
assertTrue("p:" + pages, pages < 4);
conn.close();
}
private void testLobCreationThenShutdown() throws Exception {
deleteDb("testLobCreationThenShutdown");
Connection conn = getConnection("testLobCreationThenShutdown");
Statement stat = conn.createStatement();
stat.execute("create table test(id identity, data clob)");
PreparedStatement prep = conn
.prepareStatement("insert into test values(?, ?)");
for (int i = 0; i < 9; i++) {
prep.setInt(1, i);
int size = i * i * i * i * 1024;
prep.setCharacterStream(2, new StringReader(new String(
new char[size])));
prep.execute();
}
stat.execute("shutdown immediately");
try {
conn.close();
} catch (Exception e) {
// ignore
}
conn = getConnection("testLobCreationThenShutdown");
stat = conn.createStatement();
stat.execute("drop all objects");
stat.execute("shutdown defrag");
try {
conn.close();
} catch (Exception e) {
// ignore
}
conn = getConnection("testLobCreationThenShutdown");
stat = conn.createStatement();
ResultSet rs = stat.executeQuery("select * " +
"from information_schema.settings " +
"where name = 'info.PAGE_COUNT'");
rs.next();
int pages = rs.getInt(2);
// no lobs should remain
assertTrue("p:" + pages, pages < 4);
conn.close();
}
private void testManyTransactions() throws Exception {
deleteDb("testManyTransactions");
deleteDb("testManyTransactions");
Connection conn = getConnection("testManyTransactions");
Statement stat = conn.createStatement();
stat.execute("create table test()");
......@@ -92,7 +204,7 @@ public class TestMVTableEngine extends TestBase {
stat.execute("insert into test values()");
Connection conn2 = getConnection("testManyTransactions");
Statement stat2 = conn2.createStatement();
Statement stat2 = conn2.createStatement();
for (long i = 0; i < 100000; i++) {
stat2.execute("insert into test values()");
}
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论