Unverified 提交 e6d4c99f authored 作者: Noel Grandin's avatar Noel Grandin 提交者: GitHub

Merge pull request #657 from LingMan/master

Cleanup CreateCluster tool
......@@ -13,14 +13,15 @@ import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import org.h2.api.ErrorCode;
import org.h2.engine.Constants;
import org.h2.util.IOUtils;
import org.h2.util.JdbcUtils;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import org.h2.util.Tool;
/**
* Creates a cluster from a standalone database.
* Creates a cluster from a stand-alone database.
* <br />
* Copies a database to another location if required.
* @h2.resource
......@@ -100,116 +101,98 @@ public class CreateCluster extends Tool {
private static void process(String urlSource, String urlTarget,
String user, String password, String serverList) throws SQLException {
Connection connSource = null, connTarget = null;
Statement statSource = null, statTarget = null;
PipedReader pipeReader = null;
try {
org.h2.Driver.load();
// verify that the database doesn't exist,
// or if it exists (an old cluster instance), it is deleted
boolean exists = true;
try {
connTarget = DriverManager.getConnection(urlTarget +
";IFEXISTS=TRUE;CLUSTER=" + Constants.CLUSTERING_ENABLED,
user, password);
Statement stat = connTarget.createStatement();
stat.execute("DROP ALL OBJECTS DELETE FILES");
stat.close();
exists = false;
connTarget.close();
} catch (SQLException e) {
if (e.getErrorCode() == ErrorCode.DATABASE_NOT_FOUND_1) {
// database does not exists yet - ok
exists = false;
} else {
throw e;
}
}
if (exists) {
throw new SQLException(
"Target database must not yet exist. Please delete it first: " +
urlTarget);
}
try (Connection connSource = DriverManager.getConnection(
// use cluster='' so connecting is possible
// even if the cluster is enabled
connSource = DriverManager.getConnection(urlSource +
";CLUSTER=''", user, password);
statSource = connSource.createStatement();
urlSource + ";CLUSTER=''", user, password);
Statement statSource = connSource.createStatement())
{
// enable the exclusive mode and close other connections,
// so that data can't change while restoring the second database
statSource.execute("SET EXCLUSIVE 2");
pipeReader = new PipedReader();
try {
/*
* Pipe writer is used + closed in the inner class, in a
* separate thread (needs to be final). It should be initialized
* within try{} so an exception could be caught if creation
* fails. In that scenario, the the writer should be null and
* needs no closing, and the main goal is that finally{} should
* bring the source DB out of exclusive mode, and close the
* reader.
*/
final PipedWriter pipeWriter = new PipedWriter(pipeReader);
performTransfer(statSource, urlTarget, user, password, serverList);
} finally {
// switch back to the regular mode
statSource.execute("SET EXCLUSIVE FALSE");
}
}
}
// Backup data from source database in script form.
// Start writing to pipe writer in separate thread.
final ResultSet rs = statSource.executeQuery("SCRIPT");
private static void performTransfer(Statement statSource, String urlTarget,
String user, String password, String serverList) throws SQLException {
// Delete the target database first.
connTarget = DriverManager.getConnection(
try (Connection connTarget = DriverManager.getConnection(
urlTarget + ";CLUSTER=''", user, password);
statTarget = connTarget.createStatement();
Statement statTarget = connTarget.createStatement())
{
statTarget.execute("DROP ALL OBJECTS DELETE FILES");
connTarget.close();
}
try (PipedReader pipeReader = new PipedReader()) {
Future<?> threadFuture = startWriter(pipeReader, statSource);
new Thread(
new Runnable(){
@Override
public void run() {
// Read data from pipe reader, restore on target.
try (Connection connTarget = DriverManager.getConnection(
urlTarget, user, password);
Statement statTarget = connTarget.createStatement())
{
RunScript.execute(connTarget, pipeReader);
// Check if the writer encountered any exception
try {
while (rs.next()) {
pipeWriter.write(rs.getString(1) + "\n");
}
} catch (SQLException ex) {
throw new IllegalStateException("Producing script from the source DB is failing.",ex);
} catch (IOException ex) {
throw new IllegalStateException("Producing script from the source DB is failing.",ex);
} finally {
IOUtils.closeSilently(pipeWriter);
}
}
threadFuture.get();
} catch (ExecutionException ex) {
throw new SQLException(ex.getCause());
} catch (InterruptedException ex) {
throw new SQLException(ex);
}
).start();
// Read data from pipe reader, restore on target.
connTarget = DriverManager.getConnection(urlTarget, user, password);
RunScript.execute(connTarget,pipeReader);
statTarget = connTarget.createStatement();
// set the cluster to the serverList on both databases
statSource.executeUpdate("SET CLUSTER '" + serverList + "'");
statTarget.executeUpdate("SET CLUSTER '" + serverList + "'");
}
} catch (IOException ex) {
throw new SQLException(ex);
} finally {
// switch back to the regular mode
statSource.execute("SET EXCLUSIVE FALSE");
}
} finally {
IOUtils.closeSilently(pipeReader);
JdbcUtils.closeSilently(statSource);
JdbcUtils.closeSilently(statTarget);
JdbcUtils.closeSilently(connSource);
JdbcUtils.closeSilently(connTarget);
}
private static Future<?> startWriter(final PipedReader pipeReader,
final Statement statSource) throws SQLException, IOException {
final ExecutorService thread = Executors.newFixedThreadPool(1);
// Since exceptions cannot be thrown across thread boundaries, return
// the task's future so we can check manually
Future<?> threadFuture = thread.submit(new Runnable() {
@Override
public void run() {
/*
* If the creation of the piped writer fails, the reader will
* throw an IOException as soon as read() is called:
* IOException - if the pipe is broken, unconnected, closed,
* or an I/O error occurs.
* The reader's IOException will then trigger the finally{} that
* releases exclusive mode on the source DB.
*/
try (final PipedWriter pipeWriter = new PipedWriter(pipeReader);
final ResultSet rs = statSource.executeQuery("SCRIPT"))
{
while (rs.next()) {
pipeWriter.write(rs.getString(1) + "\n");
}
} catch (SQLException | IOException ex) {
throw new IllegalStateException("Producing script from the source DB is failing.", ex);
}
}
});
thread.shutdown();
return threadFuture;
}
}
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论