开篇

* 本篇文章是使用Jest的Api来接入elasticsearch,网上说的Jest不需要连接池,查看源码会发现,JestClient确实是有一个异步方法(
executeAsync()
),但是该方法会额外创建一个线程去执行搜索任务,有些类似OkHttp,在高并发高负载的系统上,就会额外的增加系统开销,上述仅个人想法。在这里,我讲述一种通过定义连接池的方式去优化我们的elasticsearch搜索任务。
思路

* aop监控es搜索请求

* before 通过holder取得connection
* after 通过holder将该connection回收
* ThreadLocal 来线程绑定connection
相关类的作用

* aop

* EsClientAop
* EsJestClientAnn
* holder

* EsClientHolder
* pool

* EsClientPool
代码逻辑

EsClientPool


该类采用固定长度的方式,在类加载的时候就初始化了设定的连接数,这里通过BlockingQueue阻塞队列的生产-消费方式来保证多线程环境下的安全。其中Factory和Queue都是单利模式,get和remove都是包级别的访问权限。
/** * Created by yzz on 2018/9/15. */ public class EsClientPool { //阻塞队列
private static BlockingQueue<JestClient> queue; //用户自定义配置的Bean private static
EsConfigVO esConfigVO;//Jest工厂,这里是单利工厂 private static JestClientFactory factory;
//保证factory queue 的单利 static { esConfigVO = (EsConfigVO)
ApplicationHelper.getBeanByClass(EsConfigVO.class); queue =new
ArrayBlockingQueue<>(esConfigVO.getMaxConnection()); factory =new
JestClientFactory(); init(); }//初始化工厂 private static void init(){ try { //初始化工厂
Gson gson =new GsonBuilder().setDateFormat("yyyy-MM-dd'T'HH:mm:ss").create();
HttpClientConfig clientConfig =new HttpClientConfig
.Builder(esConfigVO.getUrl()) .gson(gson) .build();
factory.setHttpClientConfig(clientConfig);//初始化阻塞队列 //初始化连接 initQueue(); } catch
(InterruptedException e) { e.printStackTrace(); } }/** * 初始化连接 * @throws
InterruptedException */ static void initQueue() throws InterruptedException {
int i = 0; int maxConnection = esConfigVO.getMaxConnection(); while
(i<maxConnection){ JestClient client = factory.getObject(); queue.put(client);
i++; } System.err.println("es 初始化完成"); } static JestClient get() throws
InterruptedException {return queue.poll(1, TimeUnit.SECONDS); } static void
remove(JestClient jestClient)throws InterruptedException {
queue.put(jestClient); } }
EsClientHolder

通过ThreadLocal来保证线程局部变量,实现多线程下的资源安全。该holder类和EsClientPool 在同一个包下。
/** * Created by yzz on 2018/9/15. */ public class EsClientHolder { private
static ThreadLocal<JestClient> threadLocal = new ThreadLocal<>(); static void
set(){try { JestClient jestClient = EsClientPool.get();
threadLocal.set(jestClient); }catch (InterruptedException e) {
e.printStackTrace(); } }public static JestClient get() throws Exception {
JestClient client = threadLocal.get(); System.out.println("当前线程:"
+Thread.currentThread().getName()+" 当前ES:"+client); if (null == client) { throw
new Exception("please try again !"); } return client; } static void remove(){
try { JestClient client = threadLocal.get(); EsClientPool.remove(client);
System.out.println("当前线程:"+Thread.currentThread().getName()+" 回收ES:"+client); }
catch (InterruptedException e) { e.printStackTrace(); } } }
AOP

通过对特定注解的监控,来做到在es搜索执行前后对connection做线程绑定和回收
@Target({ElementType.METHOD}) @Retention(RetentionPolicy.RUNTIME) public @
interface EsJestClientAnn { } @Component @Aspect @Order(-100) public class
EsClientAop { @Pointcut(value = "execution(public * com.yzz.boot.es.service
..*.*(..))") public void defaultJestClient(){} @Before(value =
"defaultJestClient()&&@annotation(com.yzz.boot.es.util.EsJestClientAnn)") public
void initClient(){ EsClientHolder.set(); } @After(value =
"defaultJestClient()&&@annotation(com.yzz.boot.es.util.EsJestClientAnn)") public
void recycle(){ EsClientHolder.remove(); } }
EsConfigVO

该注解运用了springBoot的特性通过配置文件来填充该Bean。
@Component @ConfigurationProperties(prefix = "es.config.common") public class
EsConfigVO { private String url; private int maxConnection = 10; public String
getUrl() { return url; } public void setUrl(String url) { this.url = url; }
public int getMaxConnection() { return maxConnection; } public void
setMaxConnection(int maxConnection) { this.maxConnection = maxConnection; } }
配置文件
es: config: common: url: http://192.168.1.12:9200 maxConnection: 100
运用
@Service(value = "es_EsSearchService") public class EsSearchService {
@EsJestClientAnnpublic Object test() throws Exception { JestClient client =
EsClientHolder.get(); NodesStats nodesStats = new NodesStats.Builder().build();
JestResult result = client.execute(nodesStats);return result.getJsonString(); }
}
总结


在本次实践中,采用的pool方式是全部加载的方式,通过懒加载的方式会更好,下次会及时记录下来。该pool的设计优点是,在第一次访问的时候才会去初始化,优点懒加载的意思,也会避免es服务未启动导致项目启动失败的问题。

友情链接
KaDraw流程图
API参考文档
OK工具箱
云服务器优惠
阿里云优惠券
腾讯云优惠券
华为云优惠券
站点信息
问题反馈
邮箱:ixiaoyang8@qq.com
QQ群:637538335
关注微信