基于Diamond实现的动态线程池
基于Diamond实现的动态线程池
简介
dynamic-threadpool是一个动态线程池组件,可以使用这个组件来实现线程池参数的修改实时生效。
为什么要用?
第一个方面:
线程池调优是一种永恒的主题,传统的调优方式就是我们写死线程池的参数,部署,然后测试,然后根据测试的情况再修改,再重新部署,直到调优结束发布。
而基于动态线程池组件,我们可以省去部署的过程,修改直接生效,每次部署节省几分钟是最起码的吧。
第二个方面:
当我们线上出现线程池堆积问题的时候,动态线程池可以立马修改配置来快速解决问题。
核心能力
动态更新线程池参数:
xpublic abstract class AbstractRefresher implements Refresher {
private static final Logger LOGGER = LoggerFactory.getLogger(AbstractRefresher.class);
public void refresh(String content, PropertyType fileType) {
try {
PropertyResolverEnvironment environment = PropertyResolverEnvironmentFactory.SINGLETON.getInstance();
Map<Object, Object> properties = environment.getProperties(content, fileType);
if (properties == null || properties.isEmpty()) {
LOGGER.info("thread pool config not change : {}", Objects.toString(content, ""));
}
properties.forEach((poolName, config) -> {
ThreadPoolExecutor threadPoolExecutor = ThreadPoolRegistrar.threadPool(String.valueOf(poolName));
if (threadPoolExecutor != null) {
try {
PoolConfig poolConfig = JsonUtils.parseObject(JsonUtils.toJson(config), PoolConfig.class);
threadPoolExecutor.setCorePoolSize(poolConfig.getCoreSize());
threadPoolExecutor.setMaximumPoolSize(poolConfig.getMaxSize());
BlockingQueue<Runnable> queue = threadPoolExecutor.getQueue();
if (queue instanceof ResizeLinkedBlockingQueue && poolConfig.getQueueSize() != null) {
ResizeLinkedBlockingQueue resizeQueue = (ResizeLinkedBlockingQueue)queue;
resizeQueue.setCapacity(poolConfig.getQueueSize());
} else {
LOGGER.info(
"If you want to adjust the queue length dynamically, use ResizeLinkedBlockingQueue");
}
log(threadPoolExecutor);
} catch (Exception e) {
LOGGER.warn("resize failure", e);
}
}
});
} catch (Exception e) {
throw new RuntimeException(e);
}
}
private void log(ThreadPoolExecutor threadPoolExecutor) {
int oldCorePoolSize = threadPoolExecutor.getCorePoolSize();
int oldMaximumPoolSize = threadPoolExecutor.getMaximumPoolSize();
int oldQueueSize = threadPoolExecutor.getQueue().size();
LOGGER.info(
"thread pool config change success,coreSize : {}->{} ,maxSize: {}->{},queueSize: {}->{}",
oldCorePoolSize, threadPoolExecutor.getCorePoolSize(),
oldMaximumPoolSize, threadPoolExecutor.getMaximumPoolSize(),
oldQueueSize, threadPoolExecutor.getQueue().size());
}
private static class PoolConfig {
private Integer coreSize;
private Integer maxSize;
private Integer queueSize;
public Integer getCoreSize() {
return coreSize;
}
public void setCoreSize(Integer coreSize) {
this.coreSize = coreSize;
}
public Integer getMaxSize() {
return maxSize;
}
public void setMaxSize(Integer maxSize) {
this.maxSize = maxSize;
}
public Integer getQueueSize() {
return queueSize;
}
public void setQueueSize(Integer queueSize) {
this.queueSize = queueSize;
}
}
}
动态更新线程池工作队列长度:
xxxxxxxxxx
import java.util.AbstractQueue;
import java.util.Collection;
import java.util.Iterator;
import java.util.NoSuchElementException;
import java.util.Spliterator;
import java.util.Spliterators;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Consumer;
public class ResizeLinkedBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable {
private static final long serialVersionUID = -6903933977591709194L;
private static final String EMPTY = "[]";
/**
* Linked list node class
*/
static class Node<E> {
E item;
/**
* One of:
* - the real successor Node
* - this Node, meaning the successor is head.next
* - null, meaning there is no successor (this is the last node)
*/
Node<E> next;
Node(E x) { item = x; }
}
/**
* The capacity bound, or Integer.MAX_VALUE if none
*/
private int capacity;
public int getCapacity() {
return capacity;
}
public void setCapacity(int capacity) {
this.capacity = capacity;
}
/**
* Current number of elements
*/
private final AtomicInteger count = new AtomicInteger();
/**
* Head of linked list.
* Invariant: head.item == null
*/
transient Node<E> head;
/**
* Tail of linked list.
* Invariant: last.next == null
*/
private transient Node<E> last;
/**
* Lock held by take, poll, etc
*/
private final ReentrantLock takeLock = new ReentrantLock();
/**
* Wait queue for waiting takes
*/
private final Condition notEmpty = takeLock.newCondition();
/**
* Lock held by put, offer, etc
*/
private final ReentrantLock putLock = new ReentrantLock();
/**
* Wait queue for waiting puts
*/
private final Condition notFull = putLock.newCondition();
/**
* Signals a waiting take. Called only from put/offer (which do not
* otherwise ordinarily lock takeLock.)
*/
private void signalNotEmpty() {
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
notEmpty.signal();
} finally {
takeLock.unlock();
}
}
/**
* Signals a waiting put. Called only from take/poll.
*/
private void signalNotFull() {
final ReentrantLock putLock = this.putLock;
putLock.lock();
try {
notFull.signal();
} finally {
putLock.unlock();
}
}
/**
* Links node at end of queue.
*
* @param node the node
*/
private void enqueue(Node<E> node) {
last = last.next = node;
}
/**
* Removes a node from head of queue.
*
* @return the node
*/
private E dequeue() {
Node<E> h = head;
Node<E> first = h.next;
h.next = h;
head = first;
E x = first.item;
first.item = null;
return x;
}
/**
* Locks to prevent both puts and takes.
*/
private void fullyLock() {
putLock.lock();
takeLock.lock();
}
/**
* Unlocks to allow both puts and takes.
*/
private void fullyUnlock() {
takeLock.unlock();
putLock.unlock();
}
/**
* Creates a {@code LinkedBlockingQueue} with the given (fixed) capacity.
*
* @param capacity the capacity of this queue
* @throws IllegalArgumentException if {@code capacity} is not greater
* than zero
*/
public ResizeLinkedBlockingQueue(int capacity) {
if (capacity <= 0) { throw new IllegalArgumentException(); }
this.capacity = capacity;
last = head = new Node<>(null);
}
public ResizeLinkedBlockingQueue(Collection<? extends E> c) {
this(Integer.MAX_VALUE);
final ReentrantLock putLock = this.putLock;
// Never contended, but necessary for visibility
putLock.lock();
try {
int n = 0;
for (E e : c) {
if (e == null) { throw new NullPointerException(); }
if (n == capacity) { throw new IllegalStateException("Queue full"); }
enqueue(new Node<>(e));
++n;
}
count.set(n);
} finally {
putLock.unlock();
}
}
/**
* Returns the number of elements in this queue.
*
* @return the number of elements in this queue
*/
public int size() {
return count.get();
}
public int remainingCapacity() {
return capacity - count.get();
}
public void put(E e) throws InterruptedException {
// Note: convention in all put/take/etc is to preset local var
// holding count negative to indicate failure unless set.
int c;
Node<E> node = new Node<>(e);
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
putLock.lockInterruptibly();
try {
/*
* Note that count is used in wait guard even though it is
* not protected by lock. This works because count can
* only decrease at this point (all other puts are shut
* out by lock), and we (or some other waiting put) are
* signalled if it ever changes from capacity. Similarly
* for all other uses of count in other wait guards.
*/
while (count.get() == capacity) {
notFull.await();
}
enqueue(node);
c = count.getAndIncrement();
if (c + 1 < capacity) { notFull.signal(); }
} finally {
putLock.unlock();
}
if (c == 0) { signalNotEmpty(); }
}
public boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException {
if (e == null) { throw new NullPointerException(); }
long nanos = unit.toNanos(timeout);
int c;
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
putLock.lockInterruptibly();
try {
while (count.get() == capacity) {
if (nanos <= 0) { return false; }
nanos = notFull.awaitNanos(nanos);
}
enqueue(new Node<>(e));
c = count.getAndIncrement();
if (c + 1 < capacity) { notFull.signal(); }
} finally {
putLock.unlock();
}
if (c == 0) { signalNotEmpty(); }
return true;
}
public boolean offer(E e) {
final AtomicInteger count = this.count;
if (count.get() == capacity) { return false; }
int c = -1;
Node<E> node = new Node<>(e);
final ReentrantLock putLock = this.putLock;
putLock.lock();
try {
if (count.get() < capacity) {
enqueue(node);
c = count.getAndIncrement();
if (c + 1 < capacity) { notFull.signal(); }
}
} finally {
putLock.unlock();
}
if (c == 0) { signalNotEmpty(); }
return c >= 0;
}
public E take() throws InterruptedException {
E x;
int c;
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();
try {
while (count.get() == 0) {
notEmpty.await();
}
x = dequeue();
c = count.getAndDecrement();
if (c > 1) { notEmpty.signal(); }
} finally {
takeLock.unlock();
}
if (c == capacity) { signalNotFull(); }
return x;
}
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
E x;
int c;
long nanos = unit.toNanos(timeout);
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();
try {
while (count.get() == 0) {
if (nanos <= 0) { return null; }
nanos = notEmpty.awaitNanos(nanos);
}
x = dequeue();
c = count.getAndDecrement();
if (c > 1) { notEmpty.signal(); }
} finally {
takeLock.unlock();
}
if (c == capacity) { signalNotFull(); }
return x;
}
public E poll() {
final AtomicInteger count = this.count;
if (count.get() == 0) { return null; }
E x = null;
int c = -1;
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
if (count.get() > 0) {
x = dequeue();
c = count.getAndDecrement();
if (c > 1) { notEmpty.signal(); }
}
} finally {
takeLock.unlock();
}
if (c == capacity) { signalNotFull(); }
return x;
}
public E peek() {
if (count.get() == 0) { return null; }
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
Node<E> first = head.next;
if (first == null) { return null; } else { return first.item; }
} finally {
takeLock.unlock();
}
}
/**
* Unlinks interior Node p with predecessor trail.
*/
private void unlink(Node<E> p, Node<E> trail) {
// assert isFullyLocked();
// p.next is not changed, to allow iterators that are
// traversing p to maintain their weak-consistency guarantee.
p.item = null;
trail.next = p.next;
if (last == p) { last = trail; }
if (count.getAndDecrement() == capacity) { notFull.signal(); }
}
public boolean remove(Object o) {
if (o == null) { return false; }
fullyLock();
try {
for (Node<E> trail = head, p = trail.next;
p != null;
trail = p, p = p.next) {
if (o.equals(p.item)) {
unlink(p, trail);
return true;
}
}
return false;
} finally {
fullyUnlock();
}
}
public boolean contains(Object o) {
if (o == null) { return false; }
fullyLock();
try {
for (Node<E> p = head.next; p != null; p = p.next) { if (o.equals(p.item)) { return true; } }
return false;
} finally {
fullyUnlock();
}
}
public Object[] toArray() {
fullyLock();
try {
int size = count.get();
Object[] a = new Object[size];
int k = 0;
for (Node<E> p = head.next; p != null; p = p.next) { a[k++] = p.item; }
return a;
} finally {
fullyUnlock();
}
}
"unchecked") (
public <T> T[] toArray(T[] a) {
fullyLock();
try {
int size = count.get();
if (a.length < size) {
a = (T[])java.lang.reflect.Array.newInstance
(a.getClass().getComponentType(), size);
}
int k = 0;
for (Node<E> p = head.next; p != null; p = p.next) { a[k++] = (T)p.item; }
if (a.length > k) { a[k] = null; }
return a;
} finally {
fullyUnlock();
}
}
public String toString() {
fullyLock();
try {
Node<E> p = head.next;
if (p == null) { return EMPTY; }
StringBuilder sb = new StringBuilder();
sb.append('[');
for (; ; ) {
E e = p.item;
sb.append(e == this ? "(this Collection)" : e);
p = p.next;
if (p == null) { return sb.append(']').toString(); }
sb.append(',').append(' ');
}
} finally {
fullyUnlock();
}
}
/**
* Atomically removes all of the elements from this queue.
* The queue will be empty after this call returns.
*/
public void clear() {
fullyLock();
try {
for (Node<E> p, h = head; (p = h.next) != null; h = p) {
h.next = h;
p.item = null;
}
head = last;
// assert head.item == null && head.next == null;
if (count.getAndSet(0) == capacity) { notFull.signal(); }
} finally {
fullyUnlock();
}
}
public int drainTo(Collection<? super E> c) {
return drainTo(c, Integer.MAX_VALUE);
}
public int drainTo(Collection<? super E> c, int maxElements) {
if (c == this) { throw new IllegalArgumentException(); }
if (maxElements <= 0) { return 0; }
boolean signalNotFull = false;
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
int n = Math.min(maxElements, count.get());
// count.get provides visibility to first n Nodes
Node<E> h = head;
int i = 0;
try {
while (i < n) {
Node<E> p = h.next;
c.add(p.item);
p.item = null;
h.next = h;
h = p;
++i;
}
return n;
} finally {
// Restore invariants even if c.add() threw
if (i > 0) {
// assert h.item == null;
head = h;
signalNotFull = (count.getAndAdd(-i) == capacity);
}
}
} finally {
takeLock.unlock();
if (signalNotFull) { signalNotFull(); }
}
}
public Iterator<E> iterator() {
return new Itr();
}
private class Itr implements Iterator<E> {
/*
* Basic weakly-consistent iterator. At all times hold the next
* item to hand out so that if hasNext() reports true, we will
* still have it to return even if lost race with a take etc.
*/
private Node<E> current;
private Node<E> lastRet;
private E currentElement;
Itr() {
fullyLock();
try {
current = head.next;
if (current != null) { currentElement = current.item; }
} finally {
fullyUnlock();
}
}
public boolean hasNext() {
return current != null;
}
/**
* Returns the next live successor of p, or null if no such.
*
* Unlike other traversal methods, iterators need to handle both:
* - dequeued nodes (p.next == p)
* - (possibly multiple) interior removed nodes (p.item == null)
*/
private Node<E> nextNode(Node<E> p) {
for (; ; ) {
Node<E> s = p.next;
if (s == p) { return head.next; }
if (s == null || s.item != null) { return s; }
p = s;
}
}
public E next() {
fullyLock();
try {
if (current == null) { throw new NoSuchElementException(); }
E x = currentElement;
lastRet = current;
current = nextNode(current);
currentElement = (current == null) ? null : current.item;
return x;
} finally {
fullyUnlock();
}
}
public void remove() {
if (lastRet == null) { throw new IllegalStateException(); }
fullyLock();
try {
Node<E> node = lastRet;
lastRet = null;
for (Node<E> trail = head, p = trail.next;
p != null;
trail = p, p = p.next) {
if (p == node) {
unlink(p, trail);
break;
}
}
} finally {
fullyUnlock();
}
}
}
/**
* A customized variant of Spliterators.IteratorSpliterator
*/
static final class LBQSpliterator<E> implements Spliterator<E> {
static final int MAX_BATCH = 1 << 25; // max batch array size;
final ResizeLinkedBlockingQueue<E> queue;
Node<E> current;
int batch;
boolean exhausted;
long est;
LBQSpliterator(ResizeLinkedBlockingQueue<E> queue) {
this.queue = queue;
this.est = queue.size();
}
public long estimateSize() { return est; }
public Spliterator<E> trySplit() {
Node<E> h;
final ResizeLinkedBlockingQueue<E> q = this.queue;
int b = batch;
int n = (b <= 0) ? 1 : (b >= MAX_BATCH) ? MAX_BATCH : b + 1;
if (!exhausted &&
((h = current) != null || (h = q.head.next) != null) &&
h.next != null) {
Object[] a = new Object[n];
int i = 0;
Node<E> p = current;
q.fullyLock();
try {
if (p != null || (p = q.head.next) != null) {
do {
if ((a[i] = p.item) != null) { ++i; }
} while ((p = p.next) != null && i < n);
}
} finally {
q.fullyUnlock();
}
if ((current = p) == null) {
est = 0L;
exhausted = true;
} else if ((est -= i) < 0L) { est = 0L; }
if (i > 0) {
batch = i;
return Spliterators.spliterator
(a, 0, i, Spliterator.ORDERED | Spliterator.NONNULL |
Spliterator.CONCURRENT);
}
}
return null;
}
public void forEachRemaining(Consumer<? super E> action) {
if (action == null) { throw new NullPointerException(); }
final ResizeLinkedBlockingQueue<E> q = this.queue;
if (!exhausted) {
exhausted = true;
Node<E> p = current;
do {
E e = null;
q.fullyLock();
try {
if (p == null) { p = q.head.next; }
while (p != null) {
e = p.item;
p = p.next;
if (e != null) { break; }
}
} finally {
q.fullyUnlock();
}
if (e != null) { action.accept(e); }
} while (p != null);
}
}
public boolean tryAdvance(Consumer<? super E> action) {
if (action == null) { throw new NullPointerException(); }
final ResizeLinkedBlockingQueue<E> q = this.queue;
if (!exhausted) {
E e = null;
q.fullyLock();
try {
if (current == null) { current = q.head.next; }
while (current != null) {
e = current.item;
current = current.next;
if (e != null) { break; }
}
} finally {
q.fullyUnlock();
}
if (current == null) { exhausted = true; }
if (e != null) {
action.accept(e);
return true;
}
}
return false;
}
public int characteristics() {
return Spliterator.ORDERED | Spliterator.NONNULL |
Spliterator.CONCURRENT;
}
}
public Spliterator<E> spliterator() {
return new LBQSpliterator<>(this);
}
private void writeObject(java.io.ObjectOutputStream s)
throws java.io.IOException {
fullyLock();
try {
// Write out any hidden stuff, plus capacity
s.defaultWriteObject();
// Write out all elements in the proper order.
for (Node<E> p = head.next; p != null; p = p.next) { s.writeObject(p.item); }
// Use trailing null as sentinel
s.writeObject(null);
} finally {
fullyUnlock();
}
}
private void readObject(java.io.ObjectInputStream s)
throws java.io.IOException, ClassNotFoundException {
// Read in capacity, and any hidden stuff
s.defaultReadObject();
count.set(0);
last = head = new Node<>(null);
// Read in all elements and place in queue
for (; ; ) {
"unchecked") (
E item = (E)s.readObject();
if (item == null) { break; }
add(item);
}
}
}
使用详情
引入maven依赖
xxxxxxxxxx
<dependency>
<groupId>com.dzg</groupId>
<artifactId>dynamic-pool-diamond-starter</artifactId>
<version>1.0.2</version>
</dependency>
使用ThreadPoolBuilder创建线程池并交由Spring容器管理
xxxxxxxxxx
public class ThreadPoolConfig {
private static final AtomicLong CONSULT_THREAD_COUNT = new AtomicLong();
private static final AtomicLong CREDIT_RESULT_THREAD_COUNT = new AtomicLong();
"consultThreadPool") (
ExecutorService consultThreadPool() {
return
ThreadPoolBuilder.pool()
.setThreadPoolName("consultThreadPool")
.setCoreSize(20)
.setMaxSize(45)
.setKeepAliveSecs(60)
.setWorkQueue(new SynchronousQueue())
.setUseTtl(true).setDaemon(false)
.setRejectHanlder((r, executor) -> {
if (!executor.isShutdown()) {
r.run();
}
})
.setThreadFactory(
r -> {
Thread thread = new Thread(r);
thread.setDaemon(false);
thread.setName("consultThreadPool-" + CONSULT_THREAD_COUNT.getAndIncrement());
return thread;
}
)
.build();
}
"queryCreditResultPool") (
ExecutorService queryCreditResultPool() {
return
ThreadPoolBuilder.pool()
.setThreadPoolName("queryCreditResultPool")
.setCoreSize(40)
.setMaxSize(90)
.setKeepAliveSecs(60)
.setWorkQueue(new SynchronousQueue())
.setUseTtl(true).setDaemon(false)
.setRejectHanlder((r, executor) -> {
if (!executor.isShutdown()) {
r.run();
}
})
.setThreadFactory(
r -> {
Thread thread = new Thread(r);
thread.setDaemon(false);
thread.setName("queryCreditResultPool-" + CREDIT_RESULT_THREAD_COUNT.getAndIncrement());
return thread;
}
)
.build();
}
}
配置Diamond文件动态管理线程池参数
xxxxxxxxxx
{
"consultThreadPool": {
"coreSize": 40,
"maxSize": 90,
"queueSize":10000
}
}
在spring配置文件增加diamond dataId与groupId配置即可。
xxxxxxxxxx
xxx.dataId = xxx
xxx.groupId = xxx
加自动装配启动注解
在Application.java中加入@EnableDiamondDtp
xxxxxxxxxx
DtpDiamondProperties.class}) ({
public class DiamondDtpConfiguration {
private final DtpDiamondProperties properties;
public DiamondRefresher diamondRefresher() {
DiamondRefresher diamondRefresher = new DiamondRefresher();
diamondRefresher.setProperties(this.properties);
return diamondRefresher;
}
public DiamondDtpConfiguration(DtpDiamondProperties properties) {
this.properties = properties;
}
}
xxxxxxxxxx
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.TYPE})
@Import({DiamondDtpConfiguration.class})
public @interface EnableDiamondDtp {
}
使用平台压测调优
脱敏