问题描述
我有以下步骤的方案:
使用我需要连接到Sql Server数据库表A并获得数百万行。
我需要根据表数据以及来自其他来源(例如Web服务)的数据执行一些处理和解析。
我必须将处理后的数据写入其他Sql Server数据库上的目标表B,C,D,E,F,G。 写入可以(应该?)并行进行。
我想问一些关于如何正确处理第3点的建议。我想提供相同的连接到不同的线程以并行写入目标表是一个坏主意。 我的一般想法是为每个目标表(在本例中为6)产生一个新线程,并为每个表创建一个不同的jdbc连接,因此,理论上每次写入都可以并行且彼此独立地完成。
这样行吗? 对其他/更好方法的建议?
1楼
我的一般想法是为每个目标表(在本例中为6)产生一个新线程,并为每个表创建一个不同的jdbc连接,因此,理论上每次写入都可以并行且彼此独立地完成。
对我来说,听起来确实是个好计划。 我将使用诸如或的连接池来维护与数据库服务器的多个连接。 然后,您可以添加多个线程,每个线程都可以请求连接,然后将其返回到池中以供以后使用。
这样行吗? 对其他/更好方法的建议?
它会工作。 要考虑的一件事是6可能不是正确的数字。 您的服务器可能没有足够的带宽来一次处理那么多数据,因此您可能要考虑减少池中的线程数量,直到找到可以为您提供最大带宽的最佳数量。 也就是说,如果有6个表,则6个确实是正确的数字,具体取决于服务器上数据的分区方式。
根据您对线程的了解程度,应查看的 。
2楼
我已经实现了以下解决方案,该解决方案通过和使用生产者/消费者模式。 主线程(生产者)为每个工作线程(消费者)实例化一个BlockingQueue和一个“终止”布尔型易失变量,以在所有数据生成后向工作线程发出信号,并且它们应终止执行(从while循环中逃逸,清空队列并在jdbc连接上写入剩余数据)。 生产者使用两个BlockingQueueblockingQueue1和blockingQueue2为每个线程生成不同的数据。
这是简化的MainThreadProducer,它仅为两个工作线程生成整数数据:
// MainThreadProducer.java
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
public class MainThreadProducer {
public static Logger logger = LogManager.getLogger(MainThreadProducer.class);
public final static BlockingQueue<Integer> blockingQueue1 = new LinkedBlockingDeque<>(100);
public final static BlockingQueue<Integer> blockingQueue2 = new LinkedBlockingDeque<>(100);
/* signal to the worker threads that all data has been generated */
public static volatile boolean terminated = false;
private void run () {
try {
ExecutorService executor = Executors.newFixedThreadPool(2);
Future<Integer> future1 = executor.submit(new WorkerThreadConsumer("1"));
Future<Integer> future2 = executor.submit(new WorkerThreadConsumer("2"));
for (int i = 0; i < 10023; ++i) {
blockingQueue1.put(i);
blockingQueue2.put(i*2);
}
executor.shutdown();
terminated = true;
int res1 = future1.get();
int res2 = future1.get();
logger.info("Total rows written (thread 1): " + res1);
logger.info("Total rows written (thread 2): " + res2);
}
catch (Exception e) {
e.printStackTrace();
System.exit(1);
}
}
public static void main(String[] args) {
MainThreadProducer instance = new MainThreadProducer();
instance.run();
}
}
这是WorkerThreadConsumer.java类。 对于此测试,我正在创建两个线程,分别将其写入表TARGET_1和TARGET_2的数据库DBTEST中。 每个线程都使用特定的String类型(1和2)实例化,因此可以知道需要从哪个BlockingQueue读取数据。
// WorkerThreadConsumer.java
import java.sql.PreparedStatement;
import com.microsoft.sqlserver.jdbc.SQLServerResultSet;
import java.sql.*;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import Configuration;
public class WorkerThreadConsumer implements Callable<Integer> {
private String type;
public WorkerThreadConsumer (String type) {
this.type = type;
}
@Override
public Integer call() {
String TAG = "[THREAD_" + Thread.currentThread().getId() + "]";
int processed = 0; // number of rows currently processed
int batchSize = 100; // size of the batch we write to the server with the PreparedStatement
try {
// load jdbc driver
Class.forName("com.microsoft.sqlserver.jdbc.SQLServerDriver");
MainThreadProducer.logger.info(TAG + "\tLoaded com.microsoft.sqlserver.jdbc.SQLServerDriver");
String stub = String.format("INSERT INTO DBTEST.dbo.TARGET_%s (id) VALUES (?);", this.type);
BlockingQueue<Integer> queue;
switch (this.type) {
case "1":
queue = MainThreadProducer.blockingQueue1;
break;
case "2":
queue = MainThreadProducer.blockingQueue2;
break;
default:
queue = MainThreadProducer.blockingQueue1;
}
try (Connection connection = DriverManager.getConnection(Configuration.DWH_DB_CONNECTION_URL);
PreparedStatement stmt = connection.prepareStatement(stub);) {
connection.setAutoCommit(false);
while (!MainThreadProducer.terminated) {
int data = queue.take();
stmt.setInt(1, data);
stmt.addBatch();
processed += 1;
if (processed % batchSize == 0) {
int[] result = stmt.executeBatch();
connection.commit();
MainThreadProducer.logger.info(TAG + "\tWritten rows count: " + result.length);
}
}
// empty queue and write
while (!queue.isEmpty()) {
int data = queue.take();
stmt.setInt(1, data);
stmt.addBatch();
processed += 1;
if (processed % batchSize == 0) {
int[] result = stmt.executeBatch();
connection.commit();
MainThreadProducer.logger.info(TAG + "\tWritten rows count: " + result.length);
}
}
// last write in case queue size > batch size
int[] result = stmt.executeBatch();
connection.commit();
MainThreadProducer.logger.info(TAG + "\tWritten rows count: " + result.length);
}
}
catch (Exception e) {
e.printStackTrace();
System.exit(1);
}
return processed;
}
}
解决方案似乎有效。 如果您发现潜在问题,请告诉我。