提交 a5587284 authored 作者: Noel Grandin's avatar Noel Grandin

attempt to speed up parallel selects

by using reader-writer lock, reported issue #538
上级 e0c13513
...@@ -12,7 +12,7 @@ import java.util.HashMap; ...@@ -12,7 +12,7 @@ import java.util.HashMap;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Map.Entry; import java.util.Map.Entry;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.h2.mvstore.Cursor; import org.h2.mvstore.Cursor;
import org.h2.mvstore.DataUtils; import org.h2.mvstore.DataUtils;
import org.h2.mvstore.MVMap; import org.h2.mvstore.MVMap;
...@@ -50,6 +50,12 @@ public class TransactionStore { ...@@ -50,6 +50,12 @@ public class TransactionStore {
*/ */
final MVMap<Long, Object[]> undoLog; final MVMap<Long, Object[]> undoLog;
/**
* the reader/writer lock for the undo-log. Allows us to process multiple
* selects in parallel.
*/
final ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock();
/** /**
* The map of maps. * The map of maps.
*/ */
...@@ -118,13 +124,16 @@ public class TransactionStore { ...@@ -118,13 +124,16 @@ public class TransactionStore {
store.removeMap(temp); store.removeMap(temp);
} }
} }
synchronized (undoLog) { rwLock.writeLock().lock();
try {
if (undoLog.size() > 0) { if (undoLog.size() > 0) {
for (Long key : undoLog.keySet()) { for (Long key : undoLog.keySet()) {
int transactionId = getTransactionId(key); int transactionId = getTransactionId(key);
openTransactions.set(transactionId); openTransactions.set(transactionId);
} }
} }
} finally {
rwLock.writeLock().unlock();
} }
} }
...@@ -180,7 +189,8 @@ public class TransactionStore { ...@@ -180,7 +189,8 @@ public class TransactionStore {
* @return the list of transactions (sorted by id) * @return the list of transactions (sorted by id)
*/ */
public List<Transaction> getOpenTransactions() { public List<Transaction> getOpenTransactions() {
synchronized (undoLog) { rwLock.readLock().lock();
try {
ArrayList<Transaction> list = New.arrayList(); ArrayList<Transaction> list = New.arrayList();
Long key = undoLog.firstKey(); Long key = undoLog.firstKey();
while (key != null) { while (key != null) {
...@@ -207,6 +217,8 @@ public class TransactionStore { ...@@ -207,6 +217,8 @@ public class TransactionStore {
key = undoLog.ceilingKey(getOperationId(transactionId + 1, 0)); key = undoLog.ceilingKey(getOperationId(transactionId + 1, 0));
} }
return list; return list;
} finally {
rwLock.readLock().unlock();
} }
} }
...@@ -269,7 +281,8 @@ public class TransactionStore { ...@@ -269,7 +281,8 @@ public class TransactionStore {
Object key, Object oldValue) { Object key, Object oldValue) {
Long undoKey = getOperationId(t.getId(), logId); Long undoKey = getOperationId(t.getId(), logId);
Object[] log = new Object[] { mapId, key, oldValue }; Object[] log = new Object[] { mapId, key, oldValue };
synchronized (undoLog) { rwLock.writeLock().lock();
try {
if (logId == 0) { if (logId == 0) {
if (undoLog.containsKey(undoKey)) { if (undoLog.containsKey(undoKey)) {
throw DataUtils.newIllegalStateException( throw DataUtils.newIllegalStateException(
...@@ -280,6 +293,8 @@ public class TransactionStore { ...@@ -280,6 +293,8 @@ public class TransactionStore {
} }
} }
undoLog.put(undoKey, log); undoLog.put(undoKey, log);
} finally {
rwLock.writeLock().unlock();
} }
} }
...@@ -291,7 +306,8 @@ public class TransactionStore { ...@@ -291,7 +306,8 @@ public class TransactionStore {
*/ */
public void logUndo(Transaction t, long logId) { public void logUndo(Transaction t, long logId) {
Long undoKey = getOperationId(t.getId(), logId); Long undoKey = getOperationId(t.getId(), logId);
synchronized (undoLog) { rwLock.writeLock().lock();
try {
Object[] old = undoLog.remove(undoKey); Object[] old = undoLog.remove(undoKey);
if (old == null) { if (old == null) {
throw DataUtils.newIllegalStateException( throw DataUtils.newIllegalStateException(
...@@ -299,6 +315,8 @@ public class TransactionStore { ...@@ -299,6 +315,8 @@ public class TransactionStore {
"Transaction {0} was concurrently rolled back", "Transaction {0} was concurrently rolled back",
t.getId()); t.getId());
} }
} finally {
rwLock.writeLock().unlock();
} }
} }
...@@ -325,7 +343,8 @@ public class TransactionStore { ...@@ -325,7 +343,8 @@ public class TransactionStore {
return; return;
} }
// TODO could synchronize on blocks (100 at a time or so) // TODO could synchronize on blocks (100 at a time or so)
synchronized (undoLog) { rwLock.writeLock().lock();
try {
t.setStatus(Transaction.STATUS_COMMITTING); t.setStatus(Transaction.STATUS_COMMITTING);
for (long logId = 0; logId < maxLogId; logId++) { for (long logId = 0; logId < maxLogId; logId++) {
Long undoKey = getOperationId(t.getId(), logId); Long undoKey = getOperationId(t.getId(), logId);
...@@ -364,6 +383,8 @@ public class TransactionStore { ...@@ -364,6 +383,8 @@ public class TransactionStore {
} }
undoLog.remove(undoKey); undoLog.remove(undoKey);
} }
} finally {
rwLock.writeLock().unlock();
} }
endTransaction(t); endTransaction(t);
} }
...@@ -482,7 +503,8 @@ public class TransactionStore { ...@@ -482,7 +503,8 @@ public class TransactionStore {
*/ */
void rollbackTo(Transaction t, long maxLogId, long toLogId) { void rollbackTo(Transaction t, long maxLogId, long toLogId) {
// TODO could synchronize on blocks (100 at a time or so) // TODO could synchronize on blocks (100 at a time or so)
synchronized (undoLog) { rwLock.writeLock().lock();
try {
for (long logId = maxLogId - 1; logId >= toLogId; logId--) { for (long logId = maxLogId - 1; logId >= toLogId; logId--) {
Long undoKey = getOperationId(t.getId(), logId); Long undoKey = getOperationId(t.getId(), logId);
Object[] op = undoLog.get(undoKey); Object[] op = undoLog.get(undoKey);
...@@ -511,6 +533,8 @@ public class TransactionStore { ...@@ -511,6 +533,8 @@ public class TransactionStore {
} }
undoLog.remove(undoKey); undoLog.remove(undoKey);
} }
} finally {
rwLock.writeLock().unlock();
} }
} }
...@@ -535,7 +559,8 @@ public class TransactionStore { ...@@ -535,7 +559,8 @@ public class TransactionStore {
} }
private void fetchNext() { private void fetchNext() {
synchronized (undoLog) { rwLock.writeLock().lock();
try {
while (logId >= toLogId) { while (logId >= toLogId) {
Long undoKey = getOperationId(t.getId(), logId); Long undoKey = getOperationId(t.getId(), logId);
Object[] op = undoLog.get(undoKey); Object[] op = undoLog.get(undoKey);
...@@ -564,6 +589,8 @@ public class TransactionStore { ...@@ -564,6 +589,8 @@ public class TransactionStore {
return; return;
} }
} }
} finally {
rwLock.writeLock().unlock();
} }
current = null; current = null;
} }
...@@ -938,9 +965,12 @@ public class TransactionStore { ...@@ -938,9 +965,12 @@ public class TransactionStore {
Cursor<K, VersionedValue> cursor = map.cursor(null); Cursor<K, VersionedValue> cursor = map.cursor(null);
while (cursor.hasNext()) { while (cursor.hasNext()) {
VersionedValue data; VersionedValue data;
synchronized (transaction.store.undoLog) { transaction.store.rwLock.readLock().lock();
try {
K key = cursor.next(); K key = cursor.next();
data = getValue(key, readLogId, cursor.getValue()); data = getValue(key, readLogId, cursor.getValue());
} finally {
transaction.store.rwLock.readLock().unlock();
} }
if (data != null && data.value != null) { if (data != null && data.value != null) {
size++; size++;
...@@ -1203,9 +1233,12 @@ public class TransactionStore { ...@@ -1203,9 +1233,12 @@ public class TransactionStore {
} }
private VersionedValue getValue(K key, long maxLog) { private VersionedValue getValue(K key, long maxLog) {
synchronized (getUndoLog()) { transaction.store.rwLock.readLock().lock();
try {
VersionedValue data = map.get(key); VersionedValue data = map.get(key);
return getValue(key, maxLog, data); return getValue(key, maxLog, data);
} finally {
transaction.store.rwLock.readLock().unlock();
} }
} }
...@@ -1222,13 +1255,6 @@ public class TransactionStore { ...@@ -1222,13 +1255,6 @@ public class TransactionStore {
* @return the value * @return the value
*/ */
VersionedValue getValue(K key, long maxLog, VersionedValue data) { VersionedValue getValue(K key, long maxLog, VersionedValue data) {
if (MVStore.ASSERT) {
if (!Thread.holdsLock(getUndoLog())) {
throw DataUtils.newIllegalStateException(
DataUtils.ERROR_INTERNAL,
"not synchronized on undoLog");
}
}
while (true) { while (true) {
if (data == null) { if (data == null) {
// doesn't exist or deleted by a committed transaction // doesn't exist or deleted by a committed transaction
...@@ -1461,7 +1487,8 @@ public class TransactionStore { ...@@ -1461,7 +1487,8 @@ public class TransactionStore {
private void fetchNext() { private void fetchNext() {
while (cursor.hasNext()) { while (cursor.hasNext()) {
synchronized (getUndoLog()) { transaction.store.rwLock.readLock().lock();
try {
K k; K k;
try { try {
k = cursor.next(); k = cursor.next();
...@@ -1494,6 +1521,8 @@ public class TransactionStore { ...@@ -1494,6 +1521,8 @@ public class TransactionStore {
currentKey = key; currentKey = key;
return; return;
} }
} finally {
transaction.store.rwLock.readLock().unlock();
} }
} }
current = null; current = null;
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论