Merge pull request #204 from emaccaull/fix/pool-lock-contention

Speed up concurrent pool creation
This commit is contained in:
diego dupin
2025-05-23 09:47:16 +02:00
committed by GitHub
3 changed files with 91 additions and 22 deletions

View File

@ -534,20 +534,24 @@ public class Pool implements AutoCloseable, PoolMBean {
String jmxName = poolTag.replace(":", "_"); String jmxName = poolTag.replace(":", "_");
ObjectName name = new ObjectName("org.mariadb.jdbc.pool:type=" + jmxName); ObjectName name = new ObjectName("org.mariadb.jdbc.pool:type=" + jmxName);
synchronized (mbs) {
if (!mbs.isRegistered(name)) { if (!mbs.isRegistered(name)) {
mbs.registerMBean(this, name); mbs.registerMBean(this, name);
} }
} }
}
private void unRegisterJmx() throws Exception { private void unRegisterJmx() throws Exception {
MBeanServer mbs = ManagementFactory.getPlatformMBeanServer(); MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
String jmxName = poolTag.replace(":", "_"); String jmxName = poolTag.replace(":", "_");
ObjectName name = new ObjectName("org.mariadb.jdbc.pool:type=" + jmxName); ObjectName name = new ObjectName("org.mariadb.jdbc.pool:type=" + jmxName);
synchronized (mbs) {
if (mbs.isRegistered(name)) { if (mbs.isRegistered(name)) {
mbs.unregisterMBean(name); mbs.unregisterMBean(name);
} }
} }
}
/** /**
* For testing purpose only. * For testing purpose only.

View File

@ -14,9 +14,29 @@ import org.mariadb.jdbc.Configuration;
public final class Pools { public final class Pools {
private static final AtomicInteger poolIndex = new AtomicInteger(); private static final AtomicInteger poolIndex = new AtomicInteger();
private static final Map<Configuration, Pool> poolMap = new ConcurrentHashMap<>(); private static final Map<Configuration, PoolHolder> poolMap = new ConcurrentHashMap<>();
private static ScheduledThreadPoolExecutor poolExecutor = null; private static ScheduledThreadPoolExecutor poolExecutor = null;
static class PoolHolder {
private final Configuration conf;
private final int poolIndex;
private final ScheduledThreadPoolExecutor executor;
private Pool pool;
PoolHolder(Configuration conf, int poolIndex, ScheduledThreadPoolExecutor executor) {
this.conf = conf;
this.poolIndex = poolIndex;
this.executor = executor;
}
synchronized Pool getPool() {
if (pool == null) {
pool = new Pool(conf, poolIndex, executor);
}
return pool;
}
}
/** /**
* Get existing pool for a configuration. Create it if it doesn't exist. * Get existing pool for a configuration. Create it if it doesn't exist.
* *
@ -24,21 +44,23 @@ public final class Pools {
* @return pool * @return pool
*/ */
public static Pool retrievePool(Configuration conf) { public static Pool retrievePool(Configuration conf) {
if (!poolMap.containsKey(conf)) { PoolHolder holder = poolMap.get(conf);
if (holder == null) {
synchronized (poolMap) { synchronized (poolMap) {
if (!poolMap.containsKey(conf)) { holder = poolMap.get(conf);
if (holder == null) {
if (poolExecutor == null) { if (poolExecutor == null) {
poolExecutor = poolExecutor =
new ScheduledThreadPoolExecutor( new ScheduledThreadPoolExecutor(
1, new PoolThreadFactory("MariaDbPool-maxTimeoutIdle-checker")); 1, new PoolThreadFactory("MariaDbPool-maxTimeoutIdle-checker"));
} }
Pool pool = new Pool(conf, poolIndex.incrementAndGet(), poolExecutor); holder = new PoolHolder(conf, poolIndex.incrementAndGet(), poolExecutor);
poolMap.put(conf, pool); poolMap.put(conf, holder);
return pool;
} }
} }
} }
return poolMap.get(conf); // Don't initialize a pool while holding a lock on `poolMap`.
return holder.getPool();
} }
/** /**
@ -49,23 +71,20 @@ public final class Pools {
public static void remove(Pool pool) { public static void remove(Pool pool) {
if (poolMap.containsKey(pool.getConf())) { if (poolMap.containsKey(pool.getConf())) {
synchronized (poolMap) { synchronized (poolMap) {
if (poolMap.containsKey(pool.getConf())) { PoolHolder previous = poolMap.remove(pool.getConf());
poolMap.remove(pool.getConf()); if (previous != null && poolMap.isEmpty()) {
if (poolMap.isEmpty()) {
shutdownExecutor(); shutdownExecutor();
} }
} }
} }
} }
}
/** Close all pools. */ /** Close all pools. */
public static void close() { public static void close() {
synchronized (poolMap) { synchronized (poolMap) {
for (Pool pool : poolMap.values()) { for (PoolHolder holder : poolMap.values()) {
try { try {
pool.close(); holder.getPool().close();
} catch (Exception exception) { } catch (Exception exception) {
// eat // eat
} }
@ -85,10 +104,12 @@ public final class Pools {
return; return;
} }
synchronized (poolMap) { synchronized (poolMap) {
for (Pool pool : poolMap.values()) { for (PoolHolder holder : poolMap.values()) {
if (poolName.equals(pool.getConf().poolName())) { if (poolName.equals(holder.conf.poolName())) {
try { try {
pool.close(); // Pool.close() calls Pools.remove(), which does the rest of the cleanup holder
.getPool()
.close(); // Pool.close() calls Pools.remove(), which does the rest of the cleanup
} catch (Exception exception) { } catch (Exception exception) {
// eat // eat
} }

View File

@ -8,11 +8,18 @@ import static org.junit.jupiter.api.Assertions.*;
import java.lang.management.ManagementFactory; import java.lang.management.ManagementFactory;
import java.sql.*; import java.sql.*;
import java.util.HashSet; import java.util.HashSet;
import java.util.List;
import java.util.Locale; import java.util.Locale;
import java.util.Set; import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import javax.management.MBeanInfo; import javax.management.MBeanInfo;
import javax.management.MBeanServer; import javax.management.MBeanServer;
import javax.management.ObjectName; import javax.management.ObjectName;
@ -23,6 +30,7 @@ import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assumptions; import org.junit.jupiter.api.Assumptions;
import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.mariadb.jdbc.MariaDbPoolDataSource; import org.mariadb.jdbc.MariaDbPoolDataSource;
import org.mariadb.jdbc.pool.PoolThreadFactory; import org.mariadb.jdbc.pool.PoolThreadFactory;
import org.mariadb.jdbc.pool.Pools; import org.mariadb.jdbc.pool.Pools;
@ -774,4 +782,40 @@ public class PoolDataSourceTest extends Common {
assertFalse(xac.getConnection().isClosed()); assertFalse(xac.getConnection().isClosed());
xac.close(); xac.close();
} }
@Timeout(value = 5, unit = TimeUnit.SECONDS)
@Test
public void testConcurrentCreationForDifferentHosts() throws Exception {
CountDownLatch ready = new CountDownLatch(5);
CountDownLatch start = new CountDownLatch(1);
ExecutorService executor = Executors.newCachedThreadPool();
try {
// When many pools are created concurrently
List<Future<MariaDbPoolDataSource>> futures =
IntStream.rangeClosed(1, 5)
.mapToObj(
hostIndex ->
executor.submit(
() -> {
ready.countDown();
start.await();
MariaDbPoolDataSource ds = new MariaDbPoolDataSource();
ds.setUrl(
"jdbc:mariadb://myhost" + hostIndex + ":5500/db?someOption=val");
return ds;
}))
.collect(Collectors.toList());
ready.await();
start.countDown();
// Then they should all be created in a timely manner
for (Future<MariaDbPoolDataSource> future : futures) {
future.get().close();
}
} finally {
executor.shutdown();
}
}
} }