在使用elasticsearch官网提供的Java High Level Rest
Client来对es进行操作时,发现客户端API中没有连接池的概念,每次都要创建一个新的连接,这在高并发情况下明显性能会存在影响,因此这里准备对客户端API增加池化的概念。


幸运的是,我们不需要自己重头开始写连接池的实现,因为apache为我们提供了连接池的通用框架实现commons-pool2,而我们只需要根据框架设计简单实现一些逻辑即可。redis客户端API中所用到的JedisPool就是基于commons-pool2实现的。

下面我们就看看怎么实现。


首先我们要创建一个池类,这个池通过依赖的方式引入commons-pool2中的GenericObjectPool。在这个类中,我们定义了如何从池中借对象和返回对象。
public class Pool<T> implements Cloneable { protected GenericObjectPool<T>
internalPool ; public Pool(){ super(); } public Pool(final
GenericObjectPoolConfig poolConfig, PooledObjectFactory<T> factory){
initPool(poolConfig, factory); } public void initPool(final
GenericObjectPoolConfig poolConfig, PooledObjectFactory<T> factory) { if
(this.internalPool != null) { try { closeInternalPool(); } catch (Exception e)
{ } } this.internalPool = new GenericObjectPool<T>(factory, poolConfig); }
protected void closeInternalPool(){ try { internalPool.close(); } catch
(Exception e) { throw new ElasticSearchException("Could not destroy the pool",
e); } } public T getResource(){ try { return internalPool.borrowObject(); }
catch (Exception e) { throw new ElasticSearchConnectException("Could not get a
resource from the pool", e); } } public void returnResource(final T resource){
if (resource != null) { returnResourceObject(resource); } } private void
returnResourceObject(final T resource){ if (resource == null) { return; } try {
internalPool.returnObject(resource); } catch (Exception e) { throw new
ElasticSearchException("Could not return the resource to the pool", e); } }
public void returnBrokenResource(final T resource){ if (resource != null) {
returnBrokenResourceObject(resource); } } private void
returnBrokenResourceObject(T resource) { try {
internalPool.invalidateObject(resource); } catch (Exception e) { throw new
ElasticSearchException("Could not return the resource to the pool", e); } }
public void destroy(){ closeInternalPool(); } public int getNumActive() { if
(poolInactive()) { return -1; } return this.internalPool.getNumActive(); }
public int getNumIdle() { if (poolInactive()) { return -1; } return
this.internalPool.getNumIdle(); } public int getNumWaiters() { if
(poolInactive()) { return -1; } return this.internalPool.getNumWaiters(); }
public long getMeanBorrowWaitTimeMillis() { if (poolInactive()) { return -1; }
return this.internalPool.getMeanBorrowWaitTimeMillis(); } public long
getMaxBorrowWaitTimeMillis() { if (poolInactive()) { return -1; } return
this.internalPool.getMaxBorrowWaitTimeMillis(); } private boolean
poolInactive() { return this.internalPool == null ||
this.internalPool.isClosed(); } public void addObjects(int count) throws
Exception { try { for (int i = 0; i < count; i++) {
this.internalPool.addObject(); } } catch (Exception e) { throw new
Exception("Error trying to add idle objects", e); } } }
然后创建ES的连接池:
public class ElasticSearchPool extends Pool<RestHighLevelClient> { private
String clusterName; private Set<HostAndPort> clusterNodes; public
ElasticSearchPool(ElasticSearchPoolConfig config){ super(config, new
ElasticSearchClientFactory(config.getClusterName(), config.getNodes()));
this.clusterName = clusterName; this.clusterNodes = clusterNodes; } public
String getClusterName() { return clusterName; } public void
setClusterName(String clusterName) { this.clusterName = clusterName; } public
Set<HostAndPort> getClusterNodes() { return clusterNodes; } public void
setClusterNodes(Set<HostAndPort> clusterNodes) { this.clusterNodes =
clusterNodes; } }

紧接着,对于连接池我们需要配置连接池的属性,因此定义了一个ES的连接池配置类,在apache提供的commons-pool2中已经提供了一个池基本属性配置的类GenericObjectPoolConfig,我们可以直接继承此类。
public class ElasticSearchPoolConfig extends GenericObjectPoolConfig { private
long connectTimeMillis; private String clusterName; Set<HostAndPort> nodes =
new HashSet<HostAndPort>(); public long getConnectTimeMillis() { return
connectTimeMillis; } public void setConnectTimeMillis(long connectTimeMillis) {
this.connectTimeMillis = connectTimeMillis; } public String getClusterName() {
return clusterName; } public void setClusterName(String clusterName) {
this.clusterName = clusterName; } public Set<HostAndPort> getNodes() { return
nodes; } public void setNodes(Set<HostAndPort> nodes) { this.nodes = nodes; } }
最后,我们还需要做的是给这个池提供一个工厂类,用于创建池中的对象和回收对象,我们只要实现PooledObjectFactory接口并实现接口中的几个方法即可。
public class ElasticSearchClientFactory implements
PooledObjectFactory<RestHighLevelClient> { private
AtomicReference<Set<HostAndPort>> nodesReference = new
AtomicReference<Set<HostAndPort>>(); private String clusterName; public
ElasticSearchClientFactory(String clusterName, Set<HostAndPort> clusterNodes){
this.clusterName = clusterName; this.nodesReference.set(clusterNodes); } public
PooledObject<RestHighLevelClient> makeObject() throws Exception { HttpHost[]
nodes = new HttpHost[nodesReference.get().size()]; List<HttpHost> nodeList =
new ArrayList<HttpHost>(); for(HostAndPort each: nodesReference.get()){
nodeList.add(new HttpHost(each.getHost(),each.getPort(),each.getSchema())); }
nodes = nodeList.toArray(nodes); RestClientBuilder clientBuilder =
RestClient.builder(nodes); RestHighLevelClient client = new
RestHighLevelClient(clientBuilder); return new DefaultPooledObject(client); }
public void destroyObject(PooledObject<RestHighLevelClient> pooledObject)
throws Exception { RestHighLevelClient client = pooledObject.getObject();
if(client!=null&&client.ping()){ try { client.close(); }catch (Exception e){
//ignore } } } public boolean validateObject(PooledObject<RestHighLevelClient>
pooledObject) { RestHighLevelClient client = pooledObject.getObject(); try {
return client.ping(); }catch(Exception e){ return false; } } public void
activateObject(PooledObject<RestHighLevelClient> pooledObject) throws Exception
{ RestHighLevelClient client = pooledObject.getObject(); boolean response =
client.ping(); } public void passivateObject(PooledObject<RestHighLevelClient>
pooledObject) throws Exception { //nothing } }
至此,一个简单的ElaticSearch Rest Client API的连接池便完成了,测试代码如下:
public class ElasticSearchPoolTest { public static void main(String[] args)
throws Exception { Set<HostAndPort> nodes = new HashSet<HostAndPort>();
nodes.add(new HostAndPort("172.31.4.14:9200","172.31.4.14",9200,"http"));
ElasticSearchPoolConfig config = new ElasticSearchPoolConfig();
config.setConnectTimeMillis(8000); config.setMaxTotal(100);
config.setClusterName("elasticsearch"); config.setNodes(nodes);
ElasticSearchPool pool = new ElasticSearchPool(config); long start =
System.currentTimeMillis(); for(int i=0;i<1000;i++){ RestHighLevelClient client
= pool.getResource(); boolean response = client.ping();
pool.returnResource(client); } long end = System.currentTimeMillis();
System.out.println("耗时(ms):"+(end-start)); } }

备注:上面实现了一个简单的连接池,但是考虑到使用这个连接池时我们每次都要自己获取资源和释放资源,并不是十分方便,因此,一般还会在这个基础上进一步封装,将获取资源和释放资源的逻辑隐藏起来,可以参见我提交到github上的项目,访问地址如下:
https://github.com/andamajing/elasticsearch-pool
<https://github.com/andamajing/elasticsearch-pool>。

 

 

友情链接
ioDraw流程图
API参考文档
OK工具箱
云服务器优惠
阿里云优惠券
腾讯云优惠券
华为云优惠券
站点信息
问题反馈
邮箱:ixiaoyang8@qq.com
QQ群:637538335
关注微信