当前位置: 代码迷 >> java >> Java jdbc Sql Server并行写入
  详细解决方案

Java jdbc Sql Server并行写入

热度:123   发布时间:2023-07-16 17:50:31.0

我有以下步骤的方案:

  1. 使用我需要连接到Sql Server数据库表A并获得数百万行。

  2. 我需要根据表数据以及来自其他来源(例如Web服务)的数据执行一些处理和解析。

  3. 我必须将处理后的数据写入其他Sql Server数据库上的目标表B,C,D,E,F,G。 写入可以(应该?)并行进行。

我想问一些关于如何正确处理第3点的建议。我想提供相同的连接到不同的线程以并行写入目标表是一个坏主意。 我的一般想法是为每个目标表(在本例中为6)产生一个新线程,并为每个表创建一个不同的jdbc连接,因此,理论上每次写入都可以并行且彼此独立地完成。

这样行吗? 对其他/更好方法的建议?

我的一般想法是为每个目标表(在本例中为6)产生一个新线程,并为每个表创建一个不同的jdbc连接,因此,理论上每次写入都可以并行且彼此独立地完成。

对我来说,听起来确实是个好计划。 我将使用诸如或的连接池来维护与数据库服务器的多个连接。 然后,您可以添加多个线程,每个线程都可以请求连接,然后将其返回到池中以供以后使用。

这样行吗? 对其他/更好方法的建议?

它会工作。 要考虑的一件事是6可能不是正确的数字。 您的服务器可能没有足够的带宽来一次处理那么多数据,因此您可能要考虑减少池中的线程数量,直到找到可以为您提供最大带宽的最佳数量。 也就是说,如果有6个表,则6个确实是正确的数字,具体取决于服务器上数据的分区方式。

根据您对线程的了解程度,应查看的 。

我已经实现了以下解决方案,该解决方案通过和使用生产者/消费者模式。 主线程(生产者)为每个工作线程(消费者)实例化一个BlockingQueue和一个“终止”布尔型易失变量,以在所有数据生成后向工作线程发出信号,并且它们应终止执行(从while循环中逃逸,清空队列并在jdbc连接上写入剩余数据)。 生产者使用两个BlockingQueueblockingQueue1和blockingQueue2为每个线程生成不同的数据。

这是简化的MainThreadProducer,它仅为两个工作线程生成整数数据:

// MainThreadProducer.java

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

public class MainThreadProducer {

    public static Logger logger = LogManager.getLogger(MainThreadProducer.class);
    public final static BlockingQueue<Integer> blockingQueue1 = new LinkedBlockingDeque<>(100);
    public final static BlockingQueue<Integer> blockingQueue2 = new LinkedBlockingDeque<>(100);

    /* signal to the worker threads that all data has been generated */
    public static volatile boolean terminated = false;


    private void run () {

        try {

            ExecutorService executor = Executors.newFixedThreadPool(2);
            Future<Integer> future1 = executor.submit(new WorkerThreadConsumer("1"));
            Future<Integer> future2 = executor.submit(new WorkerThreadConsumer("2"));

            for (int i = 0; i < 10023; ++i) {

                blockingQueue1.put(i);
                blockingQueue2.put(i*2);

            }

            executor.shutdown();
            terminated = true;

            int res1 = future1.get();
            int res2 = future1.get();

            logger.info("Total rows written (thread 1): " + res1);
            logger.info("Total rows written (thread 2): " + res2);            

        }
        catch (Exception e) {

            e.printStackTrace();
            System.exit(1);

        }

    }

    public static void main(String[] args) {

        MainThreadProducer instance = new MainThreadProducer();
        instance.run();

    }

}

这是WorkerThreadConsumer.java类。 对于此测试,我正在创建两个线程,分别将其写入表TARGET_1和TARGET_2的数据库DBTEST中。 每个线程都使用特定的String类型(1和2)实例化,因此可以知道需要从哪个BlockingQueue读取数据。

// WorkerThreadConsumer.java

import java.sql.PreparedStatement;
import com.microsoft.sqlserver.jdbc.SQLServerResultSet;
import java.sql.*;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import Configuration;


public class WorkerThreadConsumer implements Callable<Integer> {

    private String type;

    public WorkerThreadConsumer (String type) {

        this.type = type;        

    }

    @Override
    public Integer call() {        

        String TAG = "[THREAD_" + Thread.currentThread().getId() + "]";
        int processed = 0; // number of rows currently processed
        int batchSize = 100; // size of the batch we write to the server with the PreparedStatement

        try {
            // load jdbc driver
            Class.forName("com.microsoft.sqlserver.jdbc.SQLServerDriver");
            MainThreadProducer.logger.info(TAG + "\tLoaded com.microsoft.sqlserver.jdbc.SQLServerDriver");

            String stub = String.format("INSERT INTO DBTEST.dbo.TARGET_%s (id) VALUES (?);", this.type);

            BlockingQueue<Integer> queue;

            switch (this.type) {
                case "1":
                    queue = MainThreadProducer.blockingQueue1;
                    break;

                case "2":
                    queue = MainThreadProducer.blockingQueue2;
                    break;

                default:
                    queue = MainThreadProducer.blockingQueue1;
            }

            try (Connection connection = DriverManager.getConnection(Configuration.DWH_DB_CONNECTION_URL);
                 PreparedStatement stmt = connection.prepareStatement(stub);) {

                connection.setAutoCommit(false);

                while (!MainThreadProducer.terminated) {

                    int data = queue.take();
                    stmt.setInt(1, data);
                    stmt.addBatch();
                    processed += 1;

                    if (processed % batchSize == 0) {
                        int[] result = stmt.executeBatch();
                        connection.commit();
                        MainThreadProducer.logger.info(TAG + "\tWritten rows count: " + result.length);
                    }

                }

                // empty queue and write
                while (!queue.isEmpty()) {
                    int data = queue.take();
                    stmt.setInt(1, data);
                    stmt.addBatch();
                    processed += 1;

                    if (processed % batchSize == 0) {
                        int[] result = stmt.executeBatch();
                        connection.commit();
                        MainThreadProducer.logger.info(TAG + "\tWritten rows count: " + result.length);
                    }
                }

                // last write in case queue size > batch size
                int[] result = stmt.executeBatch();
                connection.commit();
                MainThreadProducer.logger.info(TAG + "\tWritten rows count: " + result.length);


            }

        }
        catch (Exception e) {

            e.printStackTrace();
            System.exit(1);

        }

        return processed;
    }



}

解决方案似乎有效。 如果您发现潜在问题,请告诉我。

  相关解决方案