博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
聊聊HystrixConcurrencyStrategy
阅读量:6277 次
发布时间:2019-06-22

本文共 13877 字,大约阅读时间需要 46 分钟。

本文主要研究一下HystrixConcurrencyStrategy

HystrixConcurrencyStrategy

hystrix-core-1.5.12-sources.jar!/com/netflix/hystrix/strategy/concurrency/HystrixConcurrencyStrategy.java

/** * Abstract class for defining different behavior or implementations for concurrency related aspects of the system with default implementations. * 

* For example, every {@link Callable} executed by {@link HystrixCommand} will call {@link #wrapCallable(Callable)} to give a chance for custom implementations to decorate the {@link Callable} with * additional behavior. *

* When you implement a concrete {@link HystrixConcurrencyStrategy}, you should make the strategy idempotent w.r.t ThreadLocals. * Since the usage of threads by Hystrix is internal, Hystrix does not attempt to apply the strategy in an idempotent way. * Instead, you should write your strategy to work idempotently. See https://github.com/Netflix/Hystrix/issues/351 for a more detailed discussion. *

* See {@link HystrixPlugins} or the Hystrix GitHub Wiki for information on configuring plugins: https://github.com/Netflix/Hystrix/wiki/Plugins. */public abstract class HystrixConcurrencyStrategy { private final static Logger logger = LoggerFactory.getLogger(HystrixConcurrencyStrategy.class); /** * Factory method to provide {@link ThreadPoolExecutor} instances as desired. *

* Note that the corePoolSize, maximumPoolSize and keepAliveTime values will be dynamically set during runtime if their values change using the {@link ThreadPoolExecutor#setCorePoolSize}, * {@link ThreadPoolExecutor#setMaximumPoolSize} and {@link ThreadPoolExecutor#setKeepAliveTime} methods. *

* Default Implementation *

* Implementation using standard java.util.concurrent.ThreadPoolExecutor * * @param threadPoolKey * {@link HystrixThreadPoolKey} representing the {@link HystrixThreadPool} that this {@link ThreadPoolExecutor} will be used for. * @param corePoolSize * Core number of threads requested via properties (or system default if no properties set). * @param maximumPoolSize * Max number of threads requested via properties (or system default if no properties set). * @param keepAliveTime * Keep-alive time for threads requested via properties (or system default if no properties set). * @param unit * {@link TimeUnit} corresponding with keepAliveTime * @param workQueue * {@code BlockingQueue

} as provided by {@link #getBlockingQueue(int)} * @return instance of {@link ThreadPoolExecutor} */ public ThreadPoolExecutor getThreadPool(final HystrixThreadPoolKey threadPoolKey, HystrixProperty
corePoolSize, HystrixProperty
maximumPoolSize, HystrixProperty
keepAliveTime, TimeUnit unit, BlockingQueue
workQueue) { //...... } public ThreadPoolExecutor getThreadPool(final HystrixThreadPoolKey threadPoolKey, HystrixThreadPoolProperties threadPoolProperties) { //...... } /** * Factory method to provide instance of {@code BlockingQueue
} used for each {@link ThreadPoolExecutor} as constructed in {@link #getThreadPool}. *

* Note: The maxQueueSize value is provided so any type of queue can be used but typically an implementation such as {@link SynchronousQueue} without a queue (just a handoff) is preferred as * queueing is an anti-pattern to be purposefully avoided for latency tolerance reasons. *

* Default Implementation *

* Implementation returns {@link SynchronousQueue} when maxQueueSize <= 0 or {@link LinkedBlockingQueue} when maxQueueSize > 0. * * @param maxQueueSize * The max size of the queue requested via properties (or system default if no properties set). * @return instance of {@code BlockingQueue

} */ public BlockingQueue
getBlockingQueue(int maxQueueSize) { //...... } /** * Provides an opportunity to wrap/decorate a {@code Callable
} before execution. *

* This can be used to inject additional behavior such as copying of thread state (such as {@link ThreadLocal}). *

* Default Implementation *

* Pass-thru that does no wrapping. * * @param callable * {@code Callable

} to be executed via a {@link ThreadPoolExecutor} * @return {@code Callable
} either as a pass-thru or wrapping the one given */ public
Callable
wrapCallable(Callable
callable) { return callable; } /** * Factory method to return an implementation of {@link HystrixRequestVariable} that behaves like a {@link ThreadLocal} except that it * is scoped to a request instead of a thread. *

* For example, if a request starts with an HTTP request and ends with the HTTP response, then {@link HystrixRequestVariable} should * be initialized at the beginning, available on any and all threads spawned during the request and then cleaned up once the HTTP request is completed. *

* If this method is implemented it is generally necessary to also implemented {@link #wrapCallable(Callable)} in order to copy state * from parent to child thread. * * @param rv * {@link HystrixRequestVariableLifecycle} with lifecycle implementations from Hystrix * @return {@code HystrixRequestVariable

} */ public
HystrixRequestVariable
getRequestVariable(final HystrixRequestVariableLifecycle
rv) { return new HystrixLifecycleForwardingRequestVariable
(rv); } }

  • 这个类主要提供了一些方法允许自定义线程隔离的一些配置
  • getThreadPool()以及getBlockingQueue()方法,用于自定义线程池及其队列
  • wrapCallable()允许你去修饰Callable,比如做些上下文数据传递
  • getRequestVariable()返回HystrixRequestVariable,类似ThreadLocal

getThreadPool

/**     * Factory method to provide {@link ThreadPoolExecutor} instances as desired.     * 

* Note that the corePoolSize, maximumPoolSize and keepAliveTime values will be dynamically set during runtime if their values change using the {@link ThreadPoolExecutor#setCorePoolSize}, * {@link ThreadPoolExecutor#setMaximumPoolSize} and {@link ThreadPoolExecutor#setKeepAliveTime} methods. *

* Default Implementation *

* Implementation using standard java.util.concurrent.ThreadPoolExecutor * * @param threadPoolKey * {@link HystrixThreadPoolKey} representing the {@link HystrixThreadPool} that this {@link ThreadPoolExecutor} will be used for. * @param corePoolSize * Core number of threads requested via properties (or system default if no properties set). * @param maximumPoolSize * Max number of threads requested via properties (or system default if no properties set). * @param keepAliveTime * Keep-alive time for threads requested via properties (or system default if no properties set). * @param unit * {@link TimeUnit} corresponding with keepAliveTime * @param workQueue * {@code BlockingQueue

} as provided by {@link #getBlockingQueue(int)} * @return instance of {@link ThreadPoolExecutor} */ public ThreadPoolExecutor getThreadPool(final HystrixThreadPoolKey threadPoolKey, HystrixProperty
corePoolSize, HystrixProperty
maximumPoolSize, HystrixProperty
keepAliveTime, TimeUnit unit, BlockingQueue
workQueue) { final ThreadFactory threadFactory = getThreadFactory(threadPoolKey); final int dynamicCoreSize = corePoolSize.get(); final int dynamicMaximumSize = maximumPoolSize.get(); if (dynamicCoreSize > dynamicMaximumSize) { logger.error("Hystrix ThreadPool configuration at startup for : " + threadPoolKey.name() + " is trying to set coreSize = " + dynamicCoreSize + " and maximumSize = " + dynamicMaximumSize + ". Maximum size will be set to " + dynamicCoreSize + ", the coreSize value, since it must be equal to or greater than the coreSize value"); return new ThreadPoolExecutor(dynamicCoreSize, dynamicCoreSize, keepAliveTime.get(), unit, workQueue, threadFactory); } else { return new ThreadPoolExecutor(dynamicCoreSize, dynamicMaximumSize, keepAliveTime.get(), unit, workQueue, threadFactory); } } public ThreadPoolExecutor getThreadPool(final HystrixThreadPoolKey threadPoolKey, HystrixThreadPoolProperties threadPoolProperties) { final ThreadFactory threadFactory = getThreadFactory(threadPoolKey); final boolean allowMaximumSizeToDivergeFromCoreSize = threadPoolProperties.getAllowMaximumSizeToDivergeFromCoreSize().get(); final int dynamicCoreSize = threadPoolProperties.coreSize().get(); final int keepAliveTime = threadPoolProperties.keepAliveTimeMinutes().get(); final int maxQueueSize = threadPoolProperties.maxQueueSize().get(); final BlockingQueue
workQueue = getBlockingQueue(maxQueueSize); if (allowMaximumSizeToDivergeFromCoreSize) { final int dynamicMaximumSize = threadPoolProperties.maximumSize().get(); if (dynamicCoreSize > dynamicMaximumSize) { logger.error("Hystrix ThreadPool configuration at startup for : " + threadPoolKey.name() + " is trying to set coreSize = " + dynamicCoreSize + " and maximumSize = " + dynamicMaximumSize + ". Maximum size will be set to " + dynamicCoreSize + ", the coreSize value, since it must be equal to or greater than the coreSize value"); return new ThreadPoolExecutor(dynamicCoreSize, dynamicCoreSize, keepAliveTime, TimeUnit.MINUTES, workQueue, threadFactory); } else { return new ThreadPoolExecutor(dynamicCoreSize, dynamicMaximumSize, keepAliveTime, TimeUnit.MINUTES, workQueue, threadFactory); } } else { return new ThreadPoolExecutor(dynamicCoreSize, dynamicCoreSize, keepAliveTime, TimeUnit.MINUTES, workQueue, threadFactory); } } private static ThreadFactory getThreadFactory(final HystrixThreadPoolKey threadPoolKey) { if (!PlatformSpecific.isAppEngineStandardEnvironment()) { return new ThreadFactory() { private final AtomicInteger threadNumber = new AtomicInteger(0); @Override public Thread newThread(Runnable r) { Thread thread = new Thread(r, "hystrix-" + threadPoolKey.name() + "-" + threadNumber.incrementAndGet()); thread.setDaemon(true); return thread; } }; } else { return PlatformSpecific.getAppEngineThreadFactory(); } }

  • 根据HystrixThreadPoolKey以及HystrixThreadPoolProperties构建线程池
  • HystrixThreadPoolKey主要用来获取ThreadFactory,自定义了线程池的名称
  • HystrixThreadPoolProperties主要是dynamicCoreSize(corePoolSize)、dynamicMaximumSize(maximumPoolSize)、keepAliveTime、maxQueueSize这几个参数

HystrixLifecycleForwardingRequestVariable

hystrix-core-1.5.12-sources.jar!/com/netflix/hystrix/strategy/concurrency/HystrixLifecycleForwardingRequestVariable.java

/** * Implementation of {@link HystrixRequestVariable} which forwards to the wrapped * {@link HystrixRequestVariableLifecycle}. * 

* This implementation also returns null when {@link #get()} is called while the {@link HystrixRequestContext} has not * been initialized rather than throwing an exception, allowing for use in a {@link HystrixConcurrencyStrategy} which * does not depend on an a HystrixRequestContext */public class HystrixLifecycleForwardingRequestVariable

extends HystrixRequestVariableDefault
{ private final HystrixRequestVariableLifecycle
lifecycle; /** * Creates a HystrixRequestVariable which will return data as provided by the {@link HystrixRequestVariableLifecycle} * @param lifecycle lifecycle used to provide values. Must have the same type parameter as the constructed instance. */ public HystrixLifecycleForwardingRequestVariable(HystrixRequestVariableLifecycle
lifecycle) { this.lifecycle = lifecycle; } /** * Delegates to the wrapped {@link HystrixRequestVariableLifecycle} * @return T with initial value or null if none. */ @Override public T initialValue() { return lifecycle.initialValue(); } /** * Delegates to the wrapped {@link HystrixRequestVariableLifecycle} * @param value * of request variable to allow cleanup activity. *

* If nothing needs to be cleaned up then nothing needs to be done in this method. */ @Override public void shutdown(T value) { lifecycle.shutdown(value); } /** * Return null if the {@link HystrixRequestContext} has not been initialized for the current thread. *

* If {@link HystrixRequestContext} has been initialized then call method in superclass: * {@link HystrixRequestVariableDefault#get()} */ @Override public T get() { if (!HystrixRequestContext.isCurrentThreadInitialized()) { return null; } return super.get(); }}

  • 继承了HystrixRequestVariableDefault类,然后调用HystrixRequestVariableLifecycle来进行初始化和销毁
  • HystrixRequestVariableDefault主要是对HystrixRequestContext进行操作

HystrixConcurrencyStrategyDefault

hystrix-core-1.5.12-sources.jar!/com/netflix/hystrix/strategy/concurrency/HystrixConcurrencyStrategyDefault.java

/** * Default implementation of {@link HystrixConcurrencyStrategy} using standard java.util.concurrent.* implementations. *  * @ExcludeFromJavadoc */public class HystrixConcurrencyStrategyDefault extends HystrixConcurrencyStrategy {    private static HystrixConcurrencyStrategyDefault INSTANCE = new HystrixConcurrencyStrategyDefault();    public static HystrixConcurrencyStrategy getInstance() {        return INSTANCE;    }    private HystrixConcurrencyStrategyDefault() {    }}
默认实现没有重新任何方法,都是使用了父类的实现

小结

HystrixConcurrencyStrategy是提供给开发者去自定义hystrix内部线程池及其队列,还提供了包装callable的方法,以及传递上下文变量的方法。

doc

转载地址:http://uogpa.baihongyu.com/

你可能感兴趣的文章
java读取excel、txt 文件内容,传到、显示到另一个页面的文本框里面。
查看>>
《从零开始学Swift》学习笔记(Day 51)——扩展构造函数
查看>>
python多线程队列安全
查看>>
[汇编语言学习笔记][第四章第一个程序的编写]
查看>>
android 打开各种文件(setDataAndType)转:
查看>>
补交:最最原始的第一次作业(当时没有选上课,所以不知道)
查看>>
Vue实例初始化的选项配置对象详解
查看>>
PLM产品技术的发展趋势 来源:e-works 作者:清软英泰 党伟升 罗先海 耿坤瑛
查看>>
vue part3.3 小案例ajax (axios) 及页面异步显示
查看>>
浅谈MVC3自定义分页
查看>>
.net中ashx文件有什么用?功能有那些,一般用在什么情况下?
查看>>
select、poll、epoll之间的区别总结[整理]【转】
查看>>
CSS基础知识(上)
查看>>
PHP中常见的面试题2(附答案)
查看>>
26.Azure备份服务器(下)
查看>>
mybatis学习
查看>>
LCD的接口类型详解
查看>>
Spring Boot Unregistering JMX-exposed beans on shutdown
查看>>
poi 导入导出的api说明(大全)
查看>>
Mono for Android 优势与劣势
查看>>