适合新手快速学习,节省你的时间,就是我的价值
- 1. 项目总览
- 2. 配置pom.xml
- 3. 配置elasticsearch.properties
- 4. es的初始化-含权限认证
- 5. 构建ClientBuilders
- 6. 使用HighLevelClient客户端
- 7. 异常处理类
- 8. 实体bean
- 9. 增删改查API
- 10. 增删改查API测试
- 11. 增值服务
1. 项目总览
2. 配置pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.shok</groupId><artifactId>esTest</artifactId><version>0.0.1-SNAPSHOT</version><packaging>war</packaging><name>esTest</name><description>esTest</description><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.3.3.RELEASE</version><relativePath/> <!-- lookup parent from repository --></parent><properties><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding><java.version>1.8</java.version></properties><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.elasticsearch.client</groupId><artifactId>elasticsearch-rest-high-level-client</artifactId><version>7.9.0</version></dependency><dependency><groupId>org.elasticsearch</groupId><artifactId>elasticsearch</artifactId><version>7.9.0</version></dependency><dependency><groupId>org.elasticsearch.client</groupId><artifactId>elasticsearch-rest-client</artifactId><version>7.9.0</version></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>org.springframework</groupId><artifactId>spring-beans</artifactId><scope>compile</scope></dependency><dependency><groupId>commons-lang</groupId><artifactId>commons-lang</artifactId><version>2.6</version><scope>compile</scope></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.73</version><scope>compile</scope></dependency></dependencies><build><finalName>esTest</finalName><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><configuration><source>8</source><!-- 源代码使用的JDK版本 --><target>8</target><!-- 需要生成的目标class文件的编译版本 --><encoding>UTF-8</encoding><!-- 字符集编码 --></configuration></plugin></plugins></build>
</project>
3. 配置elasticsearch.properties
# es用户名
elastic.username=elastic
# es密码
elastic.password=kibana2020
#es集群名
elastic.cluster.name=app-es
#es集群地址列表,多个地址,用“,”分开,地址和端口号相对应
elastic.cluster.discover.hostname=192.168.1.58:9200,192.168.1.58:9202,192.168.1.58:9203
#es集群是否加入如自动嗅探
elastic.cluster.clientTransportSniff=true
4. es的初始化-含权限认证
package com.shok.utils;import java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
public class ConfigUtils {
private static String esConfigFileName = "elasticsearch.properties";private static String esClusterName;private static String esClusterDiscoverHostName;private static String clientTransportSniff;private static String esUserName;private static String esPassword;public static String getEsUserName() {
return esUserName;}public static void setEsUserName(String esUserName) {
ConfigUtils.esUserName = esUserName;}public static String getEsPassword() {
return esPassword;}public static void setEsPassword(String esPassword) {
ConfigUtils.esPassword = esPassword;}private static Properties properties = new Properties();static {
try {
ClassLoader classLoader = ConfigUtils.class.getClassLoader();InputStream resourceAsStream = classLoader.getResourceAsStream(esConfigFileName);properties.load(resourceAsStream);init();} catch (IOException e) {
e.printStackTrace();}}static void init() {
esUserName = properties.getProperty("elastic.username");esPassword = properties.getProperty("elastic.password");esClusterName = properties.getProperty("elastic.cluster.name");esClusterDiscoverHostName = properties.getProperty("elastic.cluster.discover.hostname");clientTransportSniff = properties.getProperty("elastic.cluster.clientTransportSniff");if ("".equals(esClusterName) || "".equals(esClusterName) || "".equals(clientTransportSniff)) {
throw new RuntimeException("elasticsearch 集群参数为空异常!");}if ("".equals(esUserName) || "".equals(esPassword)) {
throw new RuntimeException("elasticsearch 集群登录用户名和密码不能为空!");}}public static String getEsClusterName() {
return esClusterName;}public static String getEsClusterDiscoverHostName() {
return esClusterDiscoverHostName;}public static void setEsClusterDiscoverHostName(String esClusterDiscoverHostName) {
ConfigUtils.esClusterDiscoverHostName = esClusterDiscoverHostName;}public static String getClientTransportSniff() {
return clientTransportSniff;}public static void setClientTransportSniff(String clientTransportSniff) {
ConfigUtils.clientTransportSniff = clientTransportSniff;}
}
5. 构建ClientBuilders
package com.shok.client;import com.shok.utils.ConfigUtils;
import org.apache.http.Header;
import org.apache.http.HttpHost;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;
import org.apache.http.message.BasicHeader;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.Stream;
public class ClientBuilders {
private static final String CLUSTER_HOSTNAME_PORT = ConfigUtils.getEsClusterDiscoverHostName();public RestClientBuilder getSimpleClientBuilder() {
String[] ipHosts = CLUSTER_HOSTNAME_PORT.split(",");List<HttpHost> httpHostsList = Stream.of(ipHosts).map(this::createHttpHost).collect(Collectors.toList());HttpHost[] httpHosts = httpHostsList.toArray(new HttpHost[httpHostsList.size()]);RestClientBuilder builder = RestClient.builder(httpHosts);return builder;}private HttpHost createHttpHost(String ip) {
return HttpHost.create(ip);}public static RestClientBuilder getClientBulider() {
String[] hostNamesPort = CLUSTER_HOSTNAME_PORT.split(",");String host;int port;String[] temp;RestClientBuilder restClientBuilder = null;if (0 != hostNamesPort.length) {
for (String hostPort : hostNamesPort) {
temp = hostPort.split(":");host = temp[0].trim();port = Integer.parseInt(temp[1].trim());restClientBuilder = RestClient.builder(new HttpHost(host, port, "http"));}}Header[] defaultHeaders = new Header[]{
new BasicHeader("header", "value")};restClientBuilder.setDefaultHeaders(defaultHeaders);restClientBuilder.setFailureListener(new RestClient.FailureListener() {
public void onFailure(HttpHost host) {
}});restClientBuilder.setRequestConfigCallback(new RestClientBuilder.RequestConfigCallback() {
@Overridepublic RequestConfig.Builder customizeRequestConfig(RequestConfig.Builder requestConfigBuilder) {
return requestConfigBuilder.setSocketTimeout(10000);}});restClientBuilder.setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {
@Overridepublic HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpClientBuilder) {
return httpClientBuilder.setProxy(new HttpHost("proxy", 9000, "http"));}});return restClientBuilder;}
}
6. 使用HighLevelClient客户端
package com.shok.client;import com.shok.exception.ESIoException;
import com.shok.utils.ConfigUtils;
import org.apache.http.HttpHost;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;
import org.elasticsearch.client.RestHighLevelClient;import java.io.IOException;
public class HighLevelClient {
private static final String CLUSTER_HOSTNAME_PORT = ConfigUtils.getEsClusterDiscoverHostName();private static final String ES_USERNAME = ConfigUtils.getEsUserName();private static final String ES_PASSWORK = ConfigUtils.getEsPassword();private static RestHighLevelClient restHighLevelClient;public static RestHighLevelClient getClient() {
CredentialsProvider credentialsProvider = new BasicCredentialsProvider();credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(ES_USERNAME, ES_PASSWORK));String[] nodes = CLUSTER_HOSTNAME_PORT.split(",");HttpHost[] hosts = new HttpHost[nodes.length];for (int i = 0, j = nodes.length; i < j; i++) {
String hostName = org.apache.commons.lang.StringUtils.substringBeforeLast(nodes[i], ":");String port = org.apache.commons.lang.StringUtils.substringAfterLast(nodes[i], ":");hosts[i] = new HttpHost(hostName, Integer.valueOf(port));}RestClientBuilder builder = RestClient.builder(hosts);builder.setHttpClientConfigCallback(f -> f.setDefaultCredentialsProvider(credentialsProvider));restHighLevelClient = new RestHighLevelClient(builder);return restHighLevelClient;}public static void closeRestHighLevelClient() throws ESIoException {
if (null != restHighLevelClient) {
try {
restHighLevelClient.close();} catch (IOException e) {
throw new ESIoException("RestHighLevelClient Client close exception", e);}}}
}
7. 异常处理类
package com.shok.exception;import java.io.IOException;
public class ESIoException extends IOException {
public ESIoException(String messaget, Throwable throwable) {
super(messaget, throwable);}public ESIoException(String messaget) {
super(messaget);}public ESIoException(Throwable throwable) {
super(throwable);}
}
8. 实体bean
package com.shok.bean;public class UserBean {
public String id;public String name;public int age;public String addr;public String message;public UserBean(String id) {
this.id = id;}public UserBean(String id, String name, int age, String addr, String message) {
this.id = id;this.name = name;this.age = age;this.addr = addr;this.message = message;}public String getId() {
return id;}public void setId(String id) {
this.id = id;}public String getName() {
return name;}public void setName(String name) {
this.name = name;}public int getAge() {
return age;}public void setAge(int age) {
this.age = age;}public String getAddr() {
return addr;}public void setAddr(String addr) {
this.addr = addr;}public String getMessage() {
return message;}public void setMessage(String message) {
this.message = message;}
}
9. 增删改查API
package com.shok.api;import com.alibaba.fastjson.JSONObject;
import com.shok.bean.UserBean;
import com.shok.client.HighLevelClient;
import org.elasticsearch.action.DocWriteResponse;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.action.update.UpdateResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.client.indices.GetIndexRequest;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.builder.SearchSourceBuilder;import java.io.IOException;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;public class HighLevelClientAPI {
RestHighLevelClient highLevelClient = HighLevelClient.getClient();public boolean deleteIndex(String indexName) throws IOException {
DeleteIndexRequest request = new DeleteIndexRequest(indexName);AcknowledgedResponse respone = highLevelClient.indices().delete(request, RequestOptions.DEFAULT);boolean isSuccess = respone.isAcknowledged();return isSuccess;}public boolean existsIndex(String indexName) throws IOException {
GetIndexRequest request = new GetIndexRequest(indexName);boolean exists = highLevelClient.indices().exists(request, RequestOptions.DEFAULT);return exists;}public String addDoc(Map<String, Object> jsonMap, String indexName, String rowId) throws IOException {
IndexRequest indexRequest = new IndexRequest(indexName).id(rowId).source(jsonMap);IndexResponse response = highLevelClient.index(indexRequest, RequestOptions.DEFAULT);DocWriteResponse.Result result = response.getResult();return result.toString();}public String deleteDoc(String indexName, String id) throws IOException {
DeleteRequest deleteRequest = new DeleteRequest(indexName, id);DeleteResponse response = highLevelClient.delete(deleteRequest, RequestOptions.DEFAULT);DocWriteResponse.Result result = response.getResult();return result.toString();}public String updateDoc(Map<String, Object> jsonMap, String indexName, String rowId) throws IOException {
UpdateRequest request = new UpdateRequest(indexName, rowId).doc(jsonMap);UpdateResponse response = highLevelClient.update(request, RequestOptions.DEFAULT);DocWriteResponse.Result result = response.getResult();System.out.println("=====" + result.toString());return result.toString();}public String bulkDoc(String indexName, List<UserBean> add, List<UserBean> up, List<UserBean> del) throws IOException {
BulkRequest request = new BulkRequest();for (UserBean user : add) {
request.add(new IndexRequest(indexName).id(user.getId()).source(XContentType.JSON, "name", user.getName(), "age", user.getAge(), "addr", user.getAddr(), "message", user.getMessage()));}for (UserBean user : up) {
request.add(new UpdateRequest(indexName, user.getId()).doc(XContentType.JSON, "name", user.getName(), "age", user.getAge(), "addr", user.getAddr(), "message", user.getMessage()));}for (UserBean user : del) {
request.add(new DeleteRequest(indexName, user.getId()));}BulkResponse bulkResponse = highLevelClient.bulk(request, RequestOptions.DEFAULT);if (bulkResponse.hasFailures()) {
StringBuffer sb = new StringBuffer("");for (BulkItemResponse bulkItemResponse : bulkResponse) {
if (bulkItemResponse.isFailed()) {
BulkItemResponse.Failure failure = bulkItemResponse.getFailure();sb.append(failure.toString()).append("\n");}}System.out.println("=bulk error="+sb.toString());return sb.toString();} else {
return "SUCCESS";}}public Map<String, Object> getDocById(String indexName, String rowId) throws IOException {
GetRequest getRequest = new GetRequest(indexName, rowId);GetResponse response = highLevelClient.get(getRequest, RequestOptions.DEFAULT);Map<String, Object> map = response.getSource();map.put("id", rowId);return map;}public List<UserBean> searchMatch(String indexName, String fieldName, String fileValue, int startPage, int maxSize) throws IOException {
SearchRequest searchRequest = new SearchRequest(indexName);SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();searchSourceBuilder.query(QueryBuilders.matchQuery(fieldName, fileValue));searchSourceBuilder.from(startPage);searchSourceBuilder.size(maxSize);searchRequest.source(searchSourceBuilder);SearchResponse response = highLevelClient.search(searchRequest, RequestOptions.DEFAULT);SearchHit[] hits = response.getHits().getHits();List<UserBean> userList = new LinkedList<>();for (SearchHit hit : hits) {
UserBean user = JSONObject.parseObject(hit.getSourceAsString(), UserBean.class);user.setId(hit.getId());userList.add(user);}return userList;}
}
10. 增删改查API测试
package com.shok;import com.shok.api.HighLevelClientAPI;
import com.shok.bean.UserBean;
import com.shok.client.HighLevelClient;
import org.junit.Test;import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class MultiThreadTest {
@Testpublic void testRestClient() {
System.setProperty("es.set.netty.runtime.available.processors", "false");ExecutorService executorService = new ThreadPoolExecutor(10, 10,0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());for (int i = 0; i < 10; i++) {
int index = i;if (!executorService.isShutdown()) {
executorService.execute(new Runnable() {
@Overridepublic void run() {
System.out.println(("第" + index + "次获取到了连接对象————地址:" + HighLevelClient.getClient()));}});}}executorService.shutdown();try {
while (!executorService.awaitTermination(10000, TimeUnit.MILLISECONDS)) {
System.out.println("10秒没有执行完,强制关闭线程池");executorService.shutdownNow();}} catch (Exception e) {
e.printStackTrace();}}@Testpublic void testAddDoc() {
HighLevelClientAPI es = new HighLevelClientAPI();Map jsonMap = new HashMap<String, Object>();jsonMap.put("name", "张三11");jsonMap.put("age", 33);jsonMap.put("addr", "浦东 上海11");jsonMap.put("message", "胜多负少的 message 多少分的13");try {
System.out.println("====11111===========");es.addDoc(jsonMap, "intr_index", "1");System.out.println("====22222===========");} catch (IOException e) {
e.printStackTrace();}}@Testpublic void testUpdateDoc() {
HighLevelClientAPI es = new HighLevelClientAPI();Map jsonMap = new HashMap<String, Object>();jsonMap.put("name", "张三22");jsonMap.put("age", 11);jsonMap.put("addr", "上海浦东22");jsonMap.put("message", "胜多负少的 message 22 多少分的22");try {
System.out.println("====11111===========");es.updateDoc(jsonMap, "intr_index", "1");System.out.println("====22222===========");} catch (IOException e) {
e.printStackTrace();}}@Testpublic void testBulkDoc() {
List<UserBean> ls_add = new ArrayList<UserBean>();List<UserBean> ls_up = new ArrayList<UserBean>();List<UserBean> ls_del = new ArrayList<UserBean>();ls_add.add(new UserBean("1", "lisi88", 88, "上海闵行88", "message 第三方三的订单88"));ls_add.add(new UserBean("2", "lisi9", 99, "上海闵行88", "message 第三方三的订单88"));ls_up.add(new UserBean("3", "lisi2", 11, "上海闵行11", "message 第三方三的订单11"));ls_up.add(new UserBean("4", "lisi4", 25, "上海闵行4", "message 第三方三的订单4"));ls_del.add(new UserBean("4"));ls_del.add(new UserBean("5"));HighLevelClientAPI es = new HighLevelClientAPI();try {
System.out.println("====testBulkDoc 11111===========");es.bulkDoc("intr_index", ls_add, ls_up, ls_del);System.out.println("====testBulkDoc 22222===========");} catch (IOException e) {
e.printStackTrace();}}@Testpublic void testGetDocById() {
try {
HighLevelClientAPI es = new HighLevelClientAPI();Map<String, Object> map = es.getDocById("intr_index", "7");for (Map.Entry<String, Object> entry : map.entrySet()) {
String mapKey = entry.getKey();Object mapValue = entry.getValue();System.out.println(mapKey + "===:===" + mapValue);}} catch (IOException e) {
e.printStackTrace();}}@Testpublic void testSearchMatch() {
try {
HighLevelClientAPI es = new HighLevelClientAPI();List<UserBean> ls = es.searchMatch("intr_index", "name", "张三13", 0, 10);for (int i = 0; i < ls.size(); i++) {
UserBean u = ls.get(i);System.out.println("testSearchMatch==="+u.getId() + "==" + u.getName() + "=" + u.getAge() + "==" + u.getAddr() + "==" + u.getMessage());}} catch (IOException e) {
e.printStackTrace();}}
}
11. 增值服务
tel/wx:15000227329,加好友备注:csdn领取esTest代码,既可免费领取项目代码。
https://item.taobao.com/item.htm?spm=a2126o.success.0.0.647c4831eSR754&id=626561714988