基于Diamond实现的动态线程池
基于Diamond实现的动态线程池
简介
dynamic-threadpool是一个动态线程池组件,可以使用这个组件来实现线程池参数的修改实时生效。
为什么要用?
第一个方面:
线程池调优是一种永恒的主题,传统的调优方式就是我们写死线程池的参数,部署,然后测试,然后根据测试的情况再修改,再重新部署,直到调优结束发布。
而基于动态线程池组件,我们可以省去部署的过程,修改直接生效,每次部署节省几分钟是最起码的吧。
第二个方面:
当我们线上出现线程池堆积问题的时候,动态线程池可以立马修改配置来快速解决问题。
核心能力
动态更新线程池参数:
xpublic abstract class AbstractRefresher implements Refresher { private static final Logger LOGGER = LoggerFactory.getLogger(AbstractRefresher.class);
public void refresh(String content, PropertyType fileType) { try { PropertyResolverEnvironment environment = PropertyResolverEnvironmentFactory.SINGLETON.getInstance(); Map<Object, Object> properties = environment.getProperties(content, fileType);
if (properties == null || properties.isEmpty()) { LOGGER.info("thread pool config not change : {}", Objects.toString(content, "")); }
properties.forEach((poolName, config) -> { ThreadPoolExecutor threadPoolExecutor = ThreadPoolRegistrar.threadPool(String.valueOf(poolName)); if (threadPoolExecutor != null) { try { PoolConfig poolConfig = JsonUtils.parseObject(JsonUtils.toJson(config), PoolConfig.class); threadPoolExecutor.setCorePoolSize(poolConfig.getCoreSize()); threadPoolExecutor.setMaximumPoolSize(poolConfig.getMaxSize());
BlockingQueue<Runnable> queue = threadPoolExecutor.getQueue(); if (queue instanceof ResizeLinkedBlockingQueue && poolConfig.getQueueSize() != null) { ResizeLinkedBlockingQueue resizeQueue = (ResizeLinkedBlockingQueue)queue; resizeQueue.setCapacity(poolConfig.getQueueSize()); } else { LOGGER.info( "If you want to adjust the queue length dynamically, use ResizeLinkedBlockingQueue"); }
log(threadPoolExecutor); } catch (Exception e) { LOGGER.warn("resize failure", e); }
} });
} catch (Exception e) { throw new RuntimeException(e); } }
private void log(ThreadPoolExecutor threadPoolExecutor) { int oldCorePoolSize = threadPoolExecutor.getCorePoolSize(); int oldMaximumPoolSize = threadPoolExecutor.getMaximumPoolSize(); int oldQueueSize = threadPoolExecutor.getQueue().size();
LOGGER.info( "thread pool config change success,coreSize : {}->{} ,maxSize: {}->{},queueSize: {}->{}", oldCorePoolSize, threadPoolExecutor.getCorePoolSize(), oldMaximumPoolSize, threadPoolExecutor.getMaximumPoolSize(), oldQueueSize, threadPoolExecutor.getQueue().size()); }
private static class PoolConfig { private Integer coreSize; private Integer maxSize; private Integer queueSize;
public Integer getCoreSize() { return coreSize; }
public void setCoreSize(Integer coreSize) { this.coreSize = coreSize; }
public Integer getMaxSize() { return maxSize; }
public void setMaxSize(Integer maxSize) { this.maxSize = maxSize; }
public Integer getQueueSize() { return queueSize; }
public void setQueueSize(Integer queueSize) { this.queueSize = queueSize; } }}动态更新线程池工作队列长度:
xxxxxxxxxx
import java.util.AbstractQueue;import java.util.Collection;import java.util.Iterator;import java.util.NoSuchElementException;import java.util.Spliterator;import java.util.Spliterators;import java.util.concurrent.BlockingQueue;import java.util.concurrent.TimeUnit;import java.util.concurrent.atomic.AtomicInteger;import java.util.concurrent.locks.Condition;import java.util.concurrent.locks.ReentrantLock;import java.util.function.Consumer;
public class ResizeLinkedBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable { private static final long serialVersionUID = -6903933977591709194L; private static final String EMPTY = "[]";
/** * Linked list node class */ static class Node<E> { E item;
/** * One of: * - the real successor Node * - this Node, meaning the successor is head.next * - null, meaning there is no successor (this is the last node) */ Node<E> next;
Node(E x) { item = x; } }
/** * The capacity bound, or Integer.MAX_VALUE if none */ private int capacity;
public int getCapacity() { return capacity; }
public void setCapacity(int capacity) { this.capacity = capacity; }
/** * Current number of elements */ private final AtomicInteger count = new AtomicInteger();
/** * Head of linked list. * Invariant: head.item == null */ transient Node<E> head;
/** * Tail of linked list. * Invariant: last.next == null */ private transient Node<E> last;
/** * Lock held by take, poll, etc */ private final ReentrantLock takeLock = new ReentrantLock();
/** * Wait queue for waiting takes */ private final Condition notEmpty = takeLock.newCondition();
/** * Lock held by put, offer, etc */ private final ReentrantLock putLock = new ReentrantLock();
/** * Wait queue for waiting puts */ private final Condition notFull = putLock.newCondition();
/** * Signals a waiting take. Called only from put/offer (which do not * otherwise ordinarily lock takeLock.) */ private void signalNotEmpty() { final ReentrantLock takeLock = this.takeLock; takeLock.lock(); try { notEmpty.signal(); } finally { takeLock.unlock(); } }
/** * Signals a waiting put. Called only from take/poll. */ private void signalNotFull() { final ReentrantLock putLock = this.putLock; putLock.lock(); try { notFull.signal(); } finally { putLock.unlock(); } }
/** * Links node at end of queue. * * @param node the node */ private void enqueue(Node<E> node) { last = last.next = node; }
/** * Removes a node from head of queue. * * @return the node */ private E dequeue() { Node<E> h = head; Node<E> first = h.next; h.next = h; head = first; E x = first.item; first.item = null; return x; }
/** * Locks to prevent both puts and takes. */ private void fullyLock() { putLock.lock(); takeLock.lock(); }
/** * Unlocks to allow both puts and takes. */ private void fullyUnlock() { takeLock.unlock(); putLock.unlock(); }
/** * Creates a {@code LinkedBlockingQueue} with the given (fixed) capacity. * * @param capacity the capacity of this queue * @throws IllegalArgumentException if {@code capacity} is not greater * than zero */ public ResizeLinkedBlockingQueue(int capacity) { if (capacity <= 0) { throw new IllegalArgumentException(); } this.capacity = capacity; last = head = new Node<>(null); }
public ResizeLinkedBlockingQueue(Collection<? extends E> c) { this(Integer.MAX_VALUE); final ReentrantLock putLock = this.putLock; // Never contended, but necessary for visibility putLock.lock(); try { int n = 0; for (E e : c) { if (e == null) { throw new NullPointerException(); } if (n == capacity) { throw new IllegalStateException("Queue full"); } enqueue(new Node<>(e)); ++n; } count.set(n); } finally { putLock.unlock(); } }
/** * Returns the number of elements in this queue. * * @return the number of elements in this queue */ public int size() { return count.get(); }
public int remainingCapacity() { return capacity - count.get(); }
public void put(E e) throws InterruptedException { // Note: convention in all put/take/etc is to preset local var // holding count negative to indicate failure unless set. int c; Node<E> node = new Node<>(e); final ReentrantLock putLock = this.putLock; final AtomicInteger count = this.count; putLock.lockInterruptibly(); try { /* * Note that count is used in wait guard even though it is * not protected by lock. This works because count can * only decrease at this point (all other puts are shut * out by lock), and we (or some other waiting put) are * signalled if it ever changes from capacity. Similarly * for all other uses of count in other wait guards. */ while (count.get() == capacity) { notFull.await(); } enqueue(node); c = count.getAndIncrement(); if (c + 1 < capacity) { notFull.signal(); } } finally { putLock.unlock(); } if (c == 0) { signalNotEmpty(); } }
public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException {
if (e == null) { throw new NullPointerException(); } long nanos = unit.toNanos(timeout); int c; final ReentrantLock putLock = this.putLock; final AtomicInteger count = this.count; putLock.lockInterruptibly(); try { while (count.get() == capacity) { if (nanos <= 0) { return false; } nanos = notFull.awaitNanos(nanos); } enqueue(new Node<>(e)); c = count.getAndIncrement(); if (c + 1 < capacity) { notFull.signal(); } } finally { putLock.unlock(); } if (c == 0) { signalNotEmpty(); } return true; }
public boolean offer(E e) { final AtomicInteger count = this.count; if (count.get() == capacity) { return false; } int c = -1; Node<E> node = new Node<>(e); final ReentrantLock putLock = this.putLock; putLock.lock(); try { if (count.get() < capacity) { enqueue(node); c = count.getAndIncrement(); if (c + 1 < capacity) { notFull.signal(); } } } finally { putLock.unlock(); } if (c == 0) { signalNotEmpty(); } return c >= 0; }
public E take() throws InterruptedException { E x; int c; final AtomicInteger count = this.count; final ReentrantLock takeLock = this.takeLock; takeLock.lockInterruptibly(); try { while (count.get() == 0) { notEmpty.await(); } x = dequeue(); c = count.getAndDecrement(); if (c > 1) { notEmpty.signal(); } } finally { takeLock.unlock(); } if (c == capacity) { signalNotFull(); } return x; }
public E poll(long timeout, TimeUnit unit) throws InterruptedException { E x; int c; long nanos = unit.toNanos(timeout); final AtomicInteger count = this.count; final ReentrantLock takeLock = this.takeLock; takeLock.lockInterruptibly(); try { while (count.get() == 0) { if (nanos <= 0) { return null; } nanos = notEmpty.awaitNanos(nanos); } x = dequeue(); c = count.getAndDecrement(); if (c > 1) { notEmpty.signal(); } } finally { takeLock.unlock(); } if (c == capacity) { signalNotFull(); } return x; }
public E poll() { final AtomicInteger count = this.count; if (count.get() == 0) { return null; } E x = null; int c = -1; final ReentrantLock takeLock = this.takeLock; takeLock.lock(); try { if (count.get() > 0) { x = dequeue(); c = count.getAndDecrement(); if (c > 1) { notEmpty.signal(); } } } finally { takeLock.unlock(); } if (c == capacity) { signalNotFull(); } return x; }
public E peek() { if (count.get() == 0) { return null; } final ReentrantLock takeLock = this.takeLock; takeLock.lock(); try { Node<E> first = head.next; if (first == null) { return null; } else { return first.item; } } finally { takeLock.unlock(); } }
/** * Unlinks interior Node p with predecessor trail. */ private void unlink(Node<E> p, Node<E> trail) { // assert isFullyLocked(); // p.next is not changed, to allow iterators that are // traversing p to maintain their weak-consistency guarantee. p.item = null; trail.next = p.next; if (last == p) { last = trail; } if (count.getAndDecrement() == capacity) { notFull.signal(); } }
public boolean remove(Object o) { if (o == null) { return false; } fullyLock(); try { for (Node<E> trail = head, p = trail.next; p != null; trail = p, p = p.next) { if (o.equals(p.item)) { unlink(p, trail); return true; } } return false; } finally { fullyUnlock(); } }
public boolean contains(Object o) { if (o == null) { return false; } fullyLock(); try { for (Node<E> p = head.next; p != null; p = p.next) { if (o.equals(p.item)) { return true; } } return false; } finally { fullyUnlock(); } }
public Object[] toArray() { fullyLock(); try { int size = count.get(); Object[] a = new Object[size]; int k = 0; for (Node<E> p = head.next; p != null; p = p.next) { a[k++] = p.item; } return a; } finally { fullyUnlock(); } }
("unchecked") public <T> T[] toArray(T[] a) { fullyLock(); try { int size = count.get(); if (a.length < size) { a = (T[])java.lang.reflect.Array.newInstance (a.getClass().getComponentType(), size); }
int k = 0; for (Node<E> p = head.next; p != null; p = p.next) { a[k++] = (T)p.item; } if (a.length > k) { a[k] = null; } return a; } finally { fullyUnlock(); } }
public String toString() { fullyLock(); try { Node<E> p = head.next; if (p == null) { return EMPTY; }
StringBuilder sb = new StringBuilder(); sb.append('['); for (; ; ) { E e = p.item; sb.append(e == this ? "(this Collection)" : e); p = p.next; if (p == null) { return sb.append(']').toString(); } sb.append(',').append(' '); } } finally { fullyUnlock(); } }
/** * Atomically removes all of the elements from this queue. * The queue will be empty after this call returns. */ public void clear() { fullyLock(); try { for (Node<E> p, h = head; (p = h.next) != null; h = p) { h.next = h; p.item = null; } head = last; // assert head.item == null && head.next == null; if (count.getAndSet(0) == capacity) { notFull.signal(); } } finally { fullyUnlock(); } }
public int drainTo(Collection<? super E> c) { return drainTo(c, Integer.MAX_VALUE); }
public int drainTo(Collection<? super E> c, int maxElements) { if (c == this) { throw new IllegalArgumentException(); } if (maxElements <= 0) { return 0; } boolean signalNotFull = false; final ReentrantLock takeLock = this.takeLock; takeLock.lock(); try { int n = Math.min(maxElements, count.get()); // count.get provides visibility to first n Nodes Node<E> h = head; int i = 0; try { while (i < n) { Node<E> p = h.next; c.add(p.item); p.item = null; h.next = h; h = p; ++i; } return n; } finally { // Restore invariants even if c.add() threw if (i > 0) { // assert h.item == null; head = h; signalNotFull = (count.getAndAdd(-i) == capacity); } } } finally { takeLock.unlock(); if (signalNotFull) { signalNotFull(); } } }
public Iterator<E> iterator() { return new Itr(); }
private class Itr implements Iterator<E> { /* * Basic weakly-consistent iterator. At all times hold the next * item to hand out so that if hasNext() reports true, we will * still have it to return even if lost race with a take etc. */
private Node<E> current; private Node<E> lastRet; private E currentElement;
Itr() { fullyLock(); try { current = head.next; if (current != null) { currentElement = current.item; } } finally { fullyUnlock(); } }
public boolean hasNext() { return current != null; }
/** * Returns the next live successor of p, or null if no such. * * Unlike other traversal methods, iterators need to handle both: * - dequeued nodes (p.next == p) * - (possibly multiple) interior removed nodes (p.item == null) */ private Node<E> nextNode(Node<E> p) { for (; ; ) { Node<E> s = p.next; if (s == p) { return head.next; } if (s == null || s.item != null) { return s; } p = s; } }
public E next() { fullyLock(); try { if (current == null) { throw new NoSuchElementException(); } E x = currentElement; lastRet = current; current = nextNode(current); currentElement = (current == null) ? null : current.item; return x; } finally { fullyUnlock(); } }
public void remove() { if (lastRet == null) { throw new IllegalStateException(); } fullyLock(); try { Node<E> node = lastRet; lastRet = null; for (Node<E> trail = head, p = trail.next; p != null; trail = p, p = p.next) { if (p == node) { unlink(p, trail); break; } } } finally { fullyUnlock(); } } }
/** * A customized variant of Spliterators.IteratorSpliterator */ static final class LBQSpliterator<E> implements Spliterator<E> { static final int MAX_BATCH = 1 << 25; // max batch array size; final ResizeLinkedBlockingQueue<E> queue; Node<E> current; int batch; boolean exhausted; long est;
LBQSpliterator(ResizeLinkedBlockingQueue<E> queue) { this.queue = queue; this.est = queue.size(); }
public long estimateSize() { return est; }
public Spliterator<E> trySplit() { Node<E> h; final ResizeLinkedBlockingQueue<E> q = this.queue; int b = batch; int n = (b <= 0) ? 1 : (b >= MAX_BATCH) ? MAX_BATCH : b + 1; if (!exhausted && ((h = current) != null || (h = q.head.next) != null) && h.next != null) { Object[] a = new Object[n]; int i = 0; Node<E> p = current; q.fullyLock(); try { if (p != null || (p = q.head.next) != null) { do { if ((a[i] = p.item) != null) { ++i; } } while ((p = p.next) != null && i < n); } } finally { q.fullyUnlock(); } if ((current = p) == null) { est = 0L; exhausted = true; } else if ((est -= i) < 0L) { est = 0L; } if (i > 0) { batch = i; return Spliterators.spliterator (a, 0, i, Spliterator.ORDERED | Spliterator.NONNULL | Spliterator.CONCURRENT); } } return null; }
public void forEachRemaining(Consumer<? super E> action) { if (action == null) { throw new NullPointerException(); } final ResizeLinkedBlockingQueue<E> q = this.queue; if (!exhausted) { exhausted = true; Node<E> p = current; do { E e = null; q.fullyLock(); try { if (p == null) { p = q.head.next; } while (p != null) { e = p.item; p = p.next; if (e != null) { break; } } } finally { q.fullyUnlock(); } if (e != null) { action.accept(e); } } while (p != null); } }
public boolean tryAdvance(Consumer<? super E> action) { if (action == null) { throw new NullPointerException(); } final ResizeLinkedBlockingQueue<E> q = this.queue; if (!exhausted) { E e = null; q.fullyLock(); try { if (current == null) { current = q.head.next; } while (current != null) { e = current.item; current = current.next; if (e != null) { break; } } } finally { q.fullyUnlock(); } if (current == null) { exhausted = true; } if (e != null) { action.accept(e); return true; } } return false; }
public int characteristics() { return Spliterator.ORDERED | Spliterator.NONNULL | Spliterator.CONCURRENT; } }
public Spliterator<E> spliterator() { return new LBQSpliterator<>(this); }
private void writeObject(java.io.ObjectOutputStream s) throws java.io.IOException {
fullyLock(); try { // Write out any hidden stuff, plus capacity s.defaultWriteObject();
// Write out all elements in the proper order. for (Node<E> p = head.next; p != null; p = p.next) { s.writeObject(p.item); }
// Use trailing null as sentinel s.writeObject(null); } finally { fullyUnlock(); } }
private void readObject(java.io.ObjectInputStream s) throws java.io.IOException, ClassNotFoundException { // Read in capacity, and any hidden stuff s.defaultReadObject();
count.set(0); last = head = new Node<>(null);
// Read in all elements and place in queue for (; ; ) { ("unchecked") E item = (E)s.readObject(); if (item == null) { break; } add(item); } }
}
使用详情
引入maven依赖
xxxxxxxxxx<dependency> <groupId>com.dzg</groupId> <artifactId>dynamic-pool-diamond-starter</artifactId> <version>1.0.2</version></dependency>使用ThreadPoolBuilder创建线程池并交由Spring容器管理
xxxxxxxxxx
public class ThreadPoolConfig {
private static final AtomicLong CONSULT_THREAD_COUNT = new AtomicLong(); private static final AtomicLong CREDIT_RESULT_THREAD_COUNT = new AtomicLong();
("consultThreadPool") ExecutorService consultThreadPool() { return ThreadPoolBuilder.pool() .setThreadPoolName("consultThreadPool") .setCoreSize(20) .setMaxSize(45) .setKeepAliveSecs(60) .setWorkQueue(new SynchronousQueue()) .setUseTtl(true).setDaemon(false) .setRejectHanlder((r, executor) -> { if (!executor.isShutdown()) { r.run(); } }) .setThreadFactory( r -> { Thread thread = new Thread(r); thread.setDaemon(false); thread.setName("consultThreadPool-" + CONSULT_THREAD_COUNT.getAndIncrement()); return thread; } ) .build(); }
("queryCreditResultPool") ExecutorService queryCreditResultPool() { return ThreadPoolBuilder.pool() .setThreadPoolName("queryCreditResultPool") .setCoreSize(40) .setMaxSize(90) .setKeepAliveSecs(60) .setWorkQueue(new SynchronousQueue()) .setUseTtl(true).setDaemon(false) .setRejectHanlder((r, executor) -> { if (!executor.isShutdown()) { r.run(); } }) .setThreadFactory( r -> { Thread thread = new Thread(r); thread.setDaemon(false); thread.setName("queryCreditResultPool-" + CREDIT_RESULT_THREAD_COUNT.getAndIncrement()); return thread; } ) .build(); }
}配置Diamond文件动态管理线程池参数
xxxxxxxxxx{ "consultThreadPool": { "coreSize": 40, "maxSize": 90, "queueSize":10000 }}
在spring配置文件增加diamond dataId与groupId配置即可。
xxxxxxxxxxxxx.dataId = xxxxxx.groupId = xxx加自动装配启动注解
在Application.java中加入@EnableDiamondDtp
xxxxxxxxxx({DtpDiamondProperties.class})public class DiamondDtpConfiguration { private final DtpDiamondProperties properties;
public DiamondRefresher diamondRefresher() { DiamondRefresher diamondRefresher = new DiamondRefresher(); diamondRefresher.setProperties(this.properties); return diamondRefresher; }
public DiamondDtpConfiguration(DtpDiamondProperties properties) { this.properties = properties; }}xxxxxxxxxx@Retention(RetentionPolicy.RUNTIME)@Target({ElementType.TYPE})@Import({DiamondDtpConfiguration.class})public @interface EnableDiamondDtp {}
使用平台压测调优
脱敏