在使用elasticsearch官网提供的Java High Level Rest



public class Pool<T> implements Cloneable { protected GenericObjectPool<T>
internalPool ; public Pool(){ super(); } public Pool(final
GenericObjectPoolConfig poolConfig, PooledObjectFactory<T> factory){
initPool(poolConfig, factory); } public void initPool(final
GenericObjectPoolConfig poolConfig, PooledObjectFactory<T> factory) { if
(this.internalPool != null) { try { closeInternalPool(); } catch (Exception e)
{ } } this.internalPool = new GenericObjectPool<T>(factory, poolConfig); }
protected void closeInternalPool(){ try { internalPool.close(); } catch
(Exception e) { throw new ElasticSearchException("Could not destroy the pool",
e); } } public T getResource(){ try { return internalPool.borrowObject(); }
catch (Exception e) { throw new ElasticSearchConnectException("Could not get a
resource from the pool", e); } } public void returnResource(final T resource){
if (resource != null) { returnResourceObject(resource); } } private void
returnResourceObject(final T resource){ if (resource == null) { return; } try {
internalPool.returnObject(resource); } catch (Exception e) { throw new
ElasticSearchException("Could not return the resource to the pool", e); } }
public void returnBrokenResource(final T resource){ if (resource != null) {
returnBrokenResourceObject(resource); } } private void
returnBrokenResourceObject(T resource) { try {
internalPool.invalidateObject(resource); } catch (Exception e) { throw new
ElasticSearchException("Could not return the resource to the pool", e); } }
public void destroy(){ closeInternalPool(); } public int getNumActive() { if
(poolInactive()) { return -1; } return this.internalPool.getNumActive(); }
public int getNumIdle() { if (poolInactive()) { return -1; } return
this.internalPool.getNumIdle(); } public int getNumWaiters() { if
(poolInactive()) { return -1; } return this.internalPool.getNumWaiters(); }
public long getMeanBorrowWaitTimeMillis() { if (poolInactive()) { return -1; }
return this.internalPool.getMeanBorrowWaitTimeMillis(); } public long
getMaxBorrowWaitTimeMillis() { if (poolInactive()) { return -1; } return
this.internalPool.getMaxBorrowWaitTimeMillis(); } private boolean
poolInactive() { return this.internalPool == null ||
this.internalPool.isClosed(); } public void addObjects(int count) throws
Exception { try { for (int i = 0; i < count; i++) {
this.internalPool.addObject(); } } catch (Exception e) { throw new
Exception("Error trying to add idle objects", e); } } }
public class ElasticSearchPool extends Pool<RestHighLevelClient> { private
String clusterName; private Set<HostAndPort> clusterNodes; public
ElasticSearchPool(ElasticSearchPoolConfig config){ super(config, new
ElasticSearchClientFactory(config.getClusterName(), config.getNodes()));
this.clusterName = clusterName; this.clusterNodes = clusterNodes; } public
String getClusterName() { return clusterName; } public void
setClusterName(String clusterName) { this.clusterName = clusterName; } public
Set<HostAndPort> getClusterNodes() { return clusterNodes; } public void
setClusterNodes(Set<HostAndPort> clusterNodes) { this.clusterNodes =
clusterNodes; } }

public class ElasticSearchPoolConfig extends GenericObjectPoolConfig { private
long connectTimeMillis; private String clusterName; Set<HostAndPort> nodes =
new HashSet<HostAndPort>(); public long getConnectTimeMillis() { return
connectTimeMillis; } public void setConnectTimeMillis(long connectTimeMillis) {
this.connectTimeMillis = connectTimeMillis; } public String getClusterName() {
return clusterName; } public void setClusterName(String clusterName) {
this.clusterName = clusterName; } public Set<HostAndPort> getNodes() { return
nodes; } public void setNodes(Set<HostAndPort> nodes) { this.nodes = nodes; } }
public class ElasticSearchClientFactory implements
PooledObjectFactory<RestHighLevelClient> { private
AtomicReference<Set<HostAndPort>> nodesReference = new
AtomicReference<Set<HostAndPort>>(); private String clusterName; public
ElasticSearchClientFactory(String clusterName, Set<HostAndPort> clusterNodes){
this.clusterName = clusterName; this.nodesReference.set(clusterNodes); } public
PooledObject<RestHighLevelClient> makeObject() throws Exception { HttpHost[]
nodes = new HttpHost[nodesReference.get().size()]; List<HttpHost> nodeList =
new ArrayList<HttpHost>(); for(HostAndPort each: nodesReference.get()){
nodeList.add(new HttpHost(each.getHost(),each.getPort(),each.getSchema())); }
nodes = nodeList.toArray(nodes); RestClientBuilder clientBuilder =
RestClient.builder(nodes); RestHighLevelClient client = new
RestHighLevelClient(clientBuilder); return new DefaultPooledObject(client); }
public void destroyObject(PooledObject<RestHighLevelClient> pooledObject)
throws Exception { RestHighLevelClient client = pooledObject.getObject();
if(client!=null&&client.ping()){ try { client.close(); }catch (Exception e){
//ignore } } } public boolean validateObject(PooledObject<RestHighLevelClient>
pooledObject) { RestHighLevelClient client = pooledObject.getObject(); try {
return client.ping(); }catch(Exception e){ return false; } } public void
activateObject(PooledObject<RestHighLevelClient> pooledObject) throws Exception
{ RestHighLevelClient client = pooledObject.getObject(); boolean response =
client.ping(); } public void passivateObject(PooledObject<RestHighLevelClient>
pooledObject) throws Exception { //nothing } }
至此,一个简单的ElaticSearch Rest Client API的连接池便完成了,测试代码如下:
public class ElasticSearchPoolTest { public static void main(String[] args)
throws Exception { Set<HostAndPort> nodes = new HashSet<HostAndPort>();
nodes.add(new HostAndPort("","",9200,"http"));
ElasticSearchPoolConfig config = new ElasticSearchPoolConfig();
config.setConnectTimeMillis(8000); config.setMaxTotal(100);
config.setClusterName("elasticsearch"); config.setNodes(nodes);
ElasticSearchPool pool = new ElasticSearchPool(config); long start =
System.currentTimeMillis(); for(int i=0;i<1000;i++){ RestHighLevelClient client
= pool.getResource(); boolean response = client.ping();
pool.returnResource(client); } long end = System.currentTimeMillis();
System.out.println("耗时(ms):"+(end-start)); } }



