提交 c1c475e0 authored 作者: Thomas Mueller Graf's avatar Thomas Mueller Graf

Archive tool: support for slow disks

上级 d8a28853
...@@ -20,6 +20,8 @@ import java.io.InputStream; ...@@ -20,6 +20,8 @@ import java.io.InputStream;
import java.io.OutputStream; import java.io.OutputStream;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.TreeMap; import java.util.TreeMap;
import java.util.TreeSet; import java.util.TreeSet;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
...@@ -117,7 +119,7 @@ public class ArchiveTool { ...@@ -117,7 +119,7 @@ public class ArchiveTool {
if (title.get()) { if (title.get()) {
log.println(); log.println();
} }
log.println("Compressing " + size / MB + " MB"); log.println("Compressing " + size / MB + " MB at " + new java.sql.Time(start).toString());
InputStream in = getDirectoryInputStream(fromDir); InputStream in = getDirectoryInputStream(fromDir);
String temp = toFile + ".temp"; String temp = toFile + ".temp";
OutputStream out = OutputStream out =
...@@ -142,7 +144,7 @@ public class ArchiveTool { ...@@ -142,7 +144,7 @@ public class ArchiveTool {
Log log = new Log(); Log log = new Log();
long start = System.currentTimeMillis(); long start = System.currentTimeMillis();
long size = new File(fromFile).length(); long size = new File(fromFile).length();
log.println("Extracting " + size / MB + " MB"); log.println("Extracting " + size / MB + " MB at " + new java.sql.Time(start).toString());
InputStream in = InputStream in =
new BufferedInputStream( new BufferedInputStream(
new FileInputStream(fromFile), 1024 * 1024); new FileInputStream(fromFile), 1024 * 1024);
...@@ -398,8 +400,7 @@ public class ArchiveTool { ...@@ -398,8 +400,7 @@ public class ArchiveTool {
DataOutputStream tempOut = new DataOutputStream(new BufferedOutputStream( DataOutputStream tempOut = new DataOutputStream(new BufferedOutputStream(
new FileOutputStream(tempFileName), 1024 * 1024)); new FileOutputStream(tempFileName), 1024 * 1024));
byte[] bytes = new byte[bufferSize]; byte[] bytes = new byte[bufferSize];
ArrayList<Long> segmentStart = new ArrayList<Long>(); List<Long> segmentStart = new ArrayList<Long>();
long inPos = 0;
long outPos = 0; long outPos = 0;
long id = 1; long id = 1;
...@@ -407,13 +408,13 @@ public class ArchiveTool { ...@@ -407,13 +408,13 @@ public class ArchiveTool {
// Segment: chunk* 0 // Segment: chunk* 0
// Chunk: pos* 0 sortKey data // Chunk: pos* 0 sortKey data
log.setRange(0, 30, size);
while (true) { while (true) {
int len = readFully(in, bytes, bytes.length); int len = readFully(in, bytes, bytes.length);
if (len == 0) { if (len == 0) {
break; break;
} }
inPos += len; log.printProgress(len);
log.printProgress(0, 50, inPos, size);
TreeMap<Chunk, Chunk> map = new TreeMap<Chunk, Chunk>(); TreeMap<Chunk, Chunk> map = new TreeMap<Chunk, Chunk>();
for (int pos = 0; pos < len;) { for (int pos = 0; pos < len;) {
int[] key = getKey(bytes, pos, len); int[] key = getKey(bytes, pos, len);
...@@ -441,35 +442,75 @@ public class ArchiveTool { ...@@ -441,35 +442,75 @@ public class ArchiveTool {
outPos += writeVarLong(tempOut, 0); outPos += writeVarLong(tempOut, 0);
} }
tempOut.close(); tempOut.close();
size = outPos; long tempSize = new File(tempFileName).length();
inPos = 0;
TreeSet<ChunkStream> segmentIn = new TreeSet<ChunkStream>(); // merge blocks if needed
int bufferTotal = 64 * 1024 * 1024; int blockSize = 64;
int bufferPerStream = bufferTotal / segmentStart.size(); boolean merge = false;
for (int i = 0; i < segmentStart.size(); i++) { while (segmentStart.size() > blockSize) {
in = new FileInputStream(tempFileName); merge = true;
in.skip(segmentStart.get(i)); log.setRange(30, 50, tempSize);
ChunkStream s = new ChunkStream(i); log.println();
s.readKey = true; log.println("Merging " + segmentStart.size() + " segments " + blockSize + ":1");
s.in = new DataInputStream(new BufferedInputStream(in, bufferPerStream)); ArrayList<Long> segmentStart2 = new ArrayList<Long>();
inPos += s.readNext(); outPos = 0;
if (s.current != null) { DataOutputStream tempOut2 = new DataOutputStream(new BufferedOutputStream(
segmentIn.add(s); new FileOutputStream(tempFileName + ".b"), 1024 * 1024));
while (segmentStart.size() > 0) {
segmentStart2.add(outPos);
int s = Math.min(segmentStart.size(), blockSize);
List<Long> start = segmentStart.subList(0, s);
TreeSet<ChunkStream> segmentIn = new TreeSet<ChunkStream>();
long read = openSegments(start, segmentIn, tempFileName, true);
log.printProgress(read);
Chunk last = null;
Iterator<Chunk> it = merge(segmentIn, log);
while (it.hasNext()) {
Chunk c = it.next();
if (last == null) {
last = c;
} else if (last.compareTo(c) == 0) {
for (long x : c.idList) {
last.idList.add(x);
}
} else {
outPos += last.write(tempOut2, true);
last = c;
}
}
if (last != null) {
outPos += last.write(tempOut2, true);
}
// end of segment
outPos += writeVarLong(tempOut2, 0);
segmentStart = segmentStart.subList(s, segmentStart.size());
} }
segmentStart = segmentStart2;
tempOut2.close();
tempSize = new File(tempFileName).length();
new File(tempFileName).delete();
tempFileName += ".b";
} }
if (merge) {
log.println();
log.println("Combining " + segmentStart.size() + " segments");
}
TreeSet<ChunkStream> segmentIn = new TreeSet<ChunkStream>();
long read = openSegments(segmentStart, segmentIn, tempFileName, true);
log.printProgress(read);
DataOutputStream dataOut = new DataOutputStream(out); DataOutputStream dataOut = new DataOutputStream(out);
dataOut.write(HEADER); dataOut.write(HEADER);
writeVarLong(dataOut, size); writeVarLong(dataOut, size);
Chunk last = null;
// File: header length chunk* 0 // File: header length chunk* 0
// chunk: pos* 0 data // chunk: pos* 0 data
log.setRange(50, 100, tempSize);
while (segmentIn.size() > 0) { Chunk last = null;
ChunkStream s = segmentIn.first(); Iterator<Chunk> it = merge(segmentIn, log);
segmentIn.remove(s); while (it.hasNext()) {
Chunk c = s.current; Chunk c = it.next();
if (last == null) { if (last == null) {
last = c; last = c;
} else if (last.compareTo(c) == 0) { } else if (last.compareTo(c) == 0) {
...@@ -480,11 +521,6 @@ public class ArchiveTool { ...@@ -480,11 +521,6 @@ public class ArchiveTool {
last.write(dataOut, false); last.write(dataOut, false);
last = c; last = c;
} }
inPos += s.readNext();
log.printProgress(50, 100, inPos, size);
if (s.current != null) {
segmentIn.add(s);
}
} }
if (last != null) { if (last != null) {
last.write(dataOut, false); last.write(dataOut, false);
...@@ -494,6 +530,54 @@ public class ArchiveTool { ...@@ -494,6 +530,54 @@ public class ArchiveTool {
dataOut.flush(); dataOut.flush();
} }
static long openSegments(List<Long> segmentStart, TreeSet<ChunkStream> segmentIn,
String tempFileName, boolean readKey) throws IOException {
long inPos = 0;
int bufferTotal = 64 * 1024 * 1024;
int bufferPerStream = bufferTotal / segmentStart.size();
for (int i = 0; i < segmentStart.size(); i++) {
InputStream in = new FileInputStream(tempFileName);
in.skip(segmentStart.get(i));
ChunkStream s = new ChunkStream(i);
s.readKey = readKey;
s.in = new DataInputStream(new BufferedInputStream(in, bufferPerStream));
inPos += s.readNext();
if (s.current != null) {
segmentIn.add(s);
}
}
return inPos;
}
static Iterator<Chunk> merge(final TreeSet<ChunkStream> segmentIn, final Log log) {
return new Iterator<Chunk>() {
@Override
public boolean hasNext() {
return segmentIn.size() > 0;
}
@Override
public Chunk next() {
ChunkStream s = segmentIn.first();
segmentIn.remove(s);
Chunk c = s.current;
int len = s.readNext();
log.printProgress(len);
if (s.current != null) {
segmentIn.add(s);
}
return c;
}
@Override
public void remove() {
throw new UnsupportedOperationException();
}
};
}
/** /**
* Read a number of bytes. This method repeats reading until * Read a number of bytes. This method repeats reading until
* either the bytes have been read, or EOF. * either the bytes have been read, or EOF.
...@@ -596,14 +680,13 @@ public class ArchiveTool { ...@@ -596,14 +680,13 @@ public class ArchiveTool {
} }
long size = readVarLong(dataIn); long size = readVarLong(dataIn);
long outPos = 0; long outPos = 0;
long inPos = 0; List<Long> segmentStart = new ArrayList<Long>();
ArrayList<Long> segmentStart = new ArrayList<Long>();
boolean end = false; boolean end = false;
// Temp file: segment* 0 // Temp file: segment* 0
// Segment: chunk* 0 // Segment: chunk* 0
// Chunk: pos* 0 data // Chunk: pos* 0 data
log.setRange(0, 30, size);
while (!end) { while (!end) {
int segmentSize = 0; int segmentSize = 0;
TreeMap<Long, byte[]> map = new TreeMap<Long, byte[]>(); TreeMap<Long, byte[]> map = new TreeMap<Long, byte[]>();
...@@ -614,8 +697,7 @@ public class ArchiveTool { ...@@ -614,8 +697,7 @@ public class ArchiveTool {
break; break;
} }
int length = c.value.length; int length = c.value.length;
inPos += length; log.printProgress(length);
log.printProgress(0, 50, inPos, size);
segmentSize += length; segmentSize += length;
for (long x : c.idList) { for (long x : c.idList) {
map.put(x, c.value); map.put(x, c.value);
...@@ -636,32 +718,63 @@ public class ArchiveTool { ...@@ -636,32 +718,63 @@ public class ArchiveTool {
outPos += writeVarLong(tempOut, 0); outPos += writeVarLong(tempOut, 0);
} }
tempOut.close(); tempOut.close();
long tempSize = new File(tempFileName).length();
size = outPos; size = outPos;
inPos = 0;
TreeSet<ChunkStream> segmentIn = new TreeSet<ChunkStream>(); // merge blocks if needed
int bufferTotal = 64 * 1024 * 1024; int blockSize = 64;
int bufferPerStream = bufferTotal / segmentStart.size(); boolean merge = false;
for (int i = 0; i < segmentStart.size(); i++) { while (segmentStart.size() > blockSize) {
FileInputStream f = new FileInputStream(tempFileName); merge = true;
f.skip(segmentStart.get(i)); log.setRange(30, 50, tempSize);
ChunkStream s = new ChunkStream(i); log.println();
s.in = new DataInputStream(new BufferedInputStream(f, bufferPerStream)); log.println("Merging " + segmentStart.size() + " segments " + blockSize + ":1");
inPos += s.readNext(); ArrayList<Long> segmentStart2 = new ArrayList<Long>();
if (s.current != null) { outPos = 0;
segmentIn.add(s); DataOutputStream tempOut2 = new DataOutputStream(new BufferedOutputStream(
new FileOutputStream(tempFileName + ".b"), 1024 * 1024));
while (segmentStart.size() > 0) {
segmentStart2.add(outPos);
int s = Math.min(segmentStart.size(), blockSize);
List<Long> start = segmentStart.subList(0, s);
TreeSet<ChunkStream> segmentIn = new TreeSet<ChunkStream>();
long read = openSegments(start, segmentIn, tempFileName, false);
log.printProgress(read);
Iterator<Chunk> it = merge(segmentIn, log);
while (it.hasNext()) {
Chunk c = it.next();
outPos += writeVarLong(tempOut2, c.idList.get(0));
outPos += writeVarLong(tempOut2, 0);
outPos += writeVarLong(tempOut2, c.value.length);
tempOut2.write(c.value);
outPos += c.value.length;
}
outPos += writeVarLong(tempOut2, 0);
segmentStart = segmentStart.subList(s, segmentStart.size());
} }
segmentStart = segmentStart2;
tempOut2.close();
tempSize = new File(tempFileName).length();
new File(tempFileName).delete();
tempFileName += ".b";
} }
if (merge) {
log.println();
log.println("Combining " + segmentStart.size() + " segments");
}
TreeSet<ChunkStream> segmentIn = new TreeSet<ChunkStream>();
DataOutputStream dataOut = new DataOutputStream(out); DataOutputStream dataOut = new DataOutputStream(out);
while (segmentIn.size() > 0) { log.setRange(50, 100, size);
ChunkStream s = segmentIn.first();
segmentIn.remove(s); long read = openSegments(segmentStart, segmentIn, tempFileName, false);
Chunk c = s.current; log.printProgress(read);
dataOut.write(c.value);
inPos += s.readNext(); Iterator<Chunk> it = merge(segmentIn, log);
log.printProgress(50, 100, inPos, size); while (it.hasNext()) {
if (s.current != null) { dataOut.write(it.next().value);
segmentIn.add(s);
}
} }
new File(tempFileName).delete(); new File(tempFileName).delete();
dataOut.flush(); dataOut.flush();
...@@ -685,7 +798,8 @@ public class ArchiveTool { ...@@ -685,7 +798,8 @@ public class ArchiveTool {
* *
* @return the number of bytes read * @return the number of bytes read
*/ */
int readNext() throws IOException { int readNext() {
current = null;
current = Chunk.read(in, readKey); current = Chunk.read(in, readKey);
if (current == null) { if (current == null) {
return 0; return 0;
...@@ -724,30 +838,35 @@ public class ArchiveTool { ...@@ -724,30 +838,35 @@ public class ArchiveTool {
* @param readKey whether to read the sort key * @param readKey whether to read the sort key
* @return the chunk, or null if 0 has been read * @return the chunk, or null if 0 has been read
*/ */
public static Chunk read(DataInputStream in, boolean readKey) throws IOException { public static Chunk read(DataInputStream in, boolean readKey) {
ArrayList<Long> idList = new ArrayList<Long>(); try {
while (true) { ArrayList<Long> idList = new ArrayList<Long>();
long x = readVarLong(in); while (true) {
if (x == 0) { long x = readVarLong(in);
break; if (x == 0) {
break;
}
idList.add(x);
} }
idList.add(x); if (idList.size() == 0) {
} // eof
if (idList.size() == 0) { in.close();
// eof return null;
return null; }
} int[] key = null;
int[] key = null; if (readKey) {
if (readKey) { key = new int[4];
key = new int[4]; for (int i = 0; i < key.length; i++) {
for (int i = 0; i < key.length; i++) { key[i] = in.readInt();
key[i] = in.readInt(); }
} }
int len = (int) readVarLong(in);
byte[] value = new byte[len];
in.readFully(value);
return new Chunk(idList, key, value);
} catch (IOException e) {
throw new RuntimeException(e);
} }
int len = (int) readVarLong(in);
byte[] value = new byte[len];
in.readFully(value);
return new Chunk(idList, key, value);
} }
/** /**
...@@ -819,13 +938,18 @@ public class ArchiveTool { ...@@ -819,13 +938,18 @@ public class ArchiveTool {
static class Log { static class Log {
private long lastTime; private long lastTime;
private long current;
private int pos; private int pos;
private int low;
private int high;
private long total;
/** /**
* Print an empty line. * Print an empty line.
*/ */
void println() { void println() {
System.out.println(); System.out.println();
pos = 0;
} }
/** /**
...@@ -844,18 +968,30 @@ public class ArchiveTool { ...@@ -844,18 +968,30 @@ public class ArchiveTool {
*/ */
void println(String msg) { void println(String msg) {
System.out.println(msg); System.out.println(msg);
pos = 0;
} }
/** /**
* Print the progress. * Set the range.
* *
* @param low the percent value if current = 0 * @param low the percent value if current = 0
* @param high the percent value if current = total * @param high the percent value if current = total
* @param current the current value
* @param total the maximum value * @param total the maximum value
*/ */
void printProgress(int low, int high, void setRange(int low, int high, long total) {
long current, long total) { this.low = low;
this.high = high;
this.current = 0;
this.total = total;
}
/**
* Print the progress.
*
* @param current the current value
*/
void printProgress(long offset) {
current += offset;
long now = System.currentTimeMillis(); long now = System.currentTimeMillis();
if (now - lastTime > 3000) { if (now - lastTime > 3000) {
String msg = (low + (high - low) * current / total) + "% "; String msg = (low + (high - low) * current / total) + "% ";
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论