目錄
Spring Boot與ES版本對應(yīng)
Maven依賴
配置類
使用方式
@Test中注入方式
@Component中注入方式
查詢文檔
實體類
通過ElasticsearchRestTemplate查詢
通過JPA查詢
保存文檔
參考鏈接
項目組件版本:
Spring Boot:2.2.13.RELEASE
Elasticsearch:6.8.0
JDK:1.8.0_66
Spring Boot與ES版本對應(yīng)
Tips: 主要看第3列和第5列,根據(jù)ES版本選擇對應(yīng)的Spring Boot版本,如果ES和Spring Boot版本不一致后續(xù)會報錯。
Maven依賴
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.2.13.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<!-- 其他無關(guān)內(nèi)容省略 -->
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-elasticsearch</artifactId>
<version>2.2.13.RELEASE</version>
</dependency>
</dependencies>
配置類
通過配置類定義兩個ES鏈接的elasticsearchClient,如果是一個連接刪除其中一個即可。
import org.apache.commons.lang3.StringUtils;
import org.apache.http.HttpHost;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.http.impl.nio.reactor.IOReactorConfig;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;
import org.elasticsearch.client.RestHighLevelClient;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.core.convert.support.DefaultConversionService;
import org.springframework.data.elasticsearch.client.ClientConfiguration;
import org.springframework.data.elasticsearch.client.RestClients;
import org.springframework.data.elasticsearch.config.AbstractElasticsearchConfiguration;
import org.springframework.data.elasticsearch.core.ElasticsearchEntityMapper;
import org.springframework.data.elasticsearch.core.ElasticsearchRestTemplate;
import org.springframework.data.elasticsearch.core.EntityMapper;
import org.springframework.http.HttpHeaders;
/**
* @author He Changjie on 2022/6/6 14:02
*/
@Configuration
public class ElasticSearchConfig extends AbstractElasticsearchConfiguration{
/** ES鏈接一 [host:port] */
@Value("${spring.data.elasticsearch.client.reactive.endpoints}")
private String endpoints;
/** ES鏈接二 [host:port] */
@Value("${spring.data.elasticsearch.client.reactive.endpoints.zeek}")
private String endpointsZeek;
/** 連接elasticsearch超時時間 */
@Value("${spring.data.elasticsearch.client.reactive.connection-timeout}")
private Integer connectTimeout;
/** 套接字超時時間 */
@Value("${spring.data.elasticsearch.client.reactive.socket-timeout}")
private Integer socketTimeout;
/** 用戶名 */
@Value("${spring.data.elasticsearch.client.reactive.username}")
private String username;
/** 密碼 */
@Value("${spring.data.elasticsearch.client.reactive.password}")
private String password;
@Bean("elasticsearchRestTemplate")
@Primary
public ElasticsearchRestTemplate elasticsearchTemplate() {
return new ElasticsearchRestTemplate(elasticsearchClient());
}
/**
* 構(gòu)建方式一
*/
@Bean("restHighLevelClient")
@Primary
@Override
public RestHighLevelClient elasticsearchClient() {
// 初始化 RestClient, hostName 和 port 填寫集群的內(nèi)網(wǎng) IP 地址與端口
final String host = StringUtils.substringBefore(endpoints, ":");
final int port = Integer.parseInt(StringUtils.substringAfter(endpoints, ":"));
RestClientBuilder builder = RestClient.builder(new HttpHost(host, port))
.setRequestConfigCallback(config -> {
config.setConnectTimeout(connectTimeout);
config.setSocketTimeout(socketTimeout);
return config;
});
//?;畈呗? builder.setHttpClientConfigCallback(httpClientBuilder -> httpClientBuilder
.setDefaultIOReactorConfig(IOReactorConfig.custom()
.setSoKeepAlive(true)
.build()));
// 設(shè)置認(rèn)證信息
final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(username, password));
builder.setHttpClientConfigCallback(httpAsyncClientBuilder -> {
httpAsyncClientBuilder.disableAuthCaching();
return httpAsyncClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
});
return new RestHighLevelClient(builder);
}
@Bean("zeekElasticsearchTemplate")
public ElasticsearchRestTemplate ZeekElasticsearchTemplate() {
return new ElasticsearchRestTemplate(zeekRestHighLevelClient());
}
/**
* 構(gòu)建方式二
*/
@Bean("zeekRestHighLevelClient")
public RestHighLevelClient zeekRestHighLevelClient() {
HttpHeaders defaultHeaders = new HttpHeaders();
defaultHeaders.setBasicAuth(username, password);
ClientConfiguration clientConfiguration = ClientConfiguration.builder()
.connectedTo(endpointsZeek)
.withConnectTimeout(connectTimeout)
.withSocketTimeout(socketTimeout)
.withDefaultHeaders(defaultHeaders)
.withBasicAuth(username, password)
.build();
return RestClients.create(clientConfiguration).rest();
}
@Bean
@Override
public EntityMapper entityMapper() {
ElasticsearchEntityMapper entityMapper = new ElasticsearchEntityMapper(elasticsearchMappingContext(),
new DefaultConversionService());
entityMapper.setConversions(elasticsearchCustomConversions());
return entityMapper;
}
}
Tips:
- 配置類中定義RestHighLevelClient使用了兩個種方式,任選其中一種都可以
- 必須選擇一個ES鏈接打上@Primary注解
- 示例中兩個連接都需要密碼,且密碼相同
使用方式
elasticsearchRestTemplate的使用在@Test中和其他@Component中注入方式不同(親測),在@Component中直接使用@Resource注入ElasticsearchRestTemplate會報找不到對應(yīng)的Bean。
@Test中注入方式
@Resource(name = "elasticsearchRestTemplate")
private ElasticsearchRestTemplate elasticsearchRestTemplate;
@Resource(name = "zeekElasticsearchTemplate")
private ElasticsearchRestTemplate zeekElasticsearchTemplate;
@Component中注入方式
@Service
public class DemoServiceImpl implements DemoService {
private final ElasticsearchRestTemplate elasticsearchRestTemplate;
private final ElasticsearchRestTemplate zeekElasticsearchRestTemplate;
@Autowired
public DemoServiceImpl(RestHighLevelClient restHighLevelClient,
@Qualifier(value = "zeekRestHighLevelClient") RestHighLevelClient zeekRestHighLevelClient) {
this.elasticsearchRestTemplate = new ElasticsearchRestTemplate(restHighLevelClient);
this.zeekElasticsearchRestTemplate = new ElasticsearchRestTemplate(zeekRestHighLevelClient);
}
}
查詢文檔
實體類
import lombok.Data;
import org.springframework.data.annotation.Id;
import org.springframework.data.elasticsearch.annotations.Document;
import org.springframework.data.elasticsearch.annotations.Field;
import org.springframework.data.elasticsearch.annotations.FieldType;
import java.io.Serializable;
@Data
@Document(indexName = "demo-index-20220609", type = "log")
public class QdnsLogs implements Serializable {
@Id
private String _id;
@Field(type = FieldType.Keyword)
private String name;
@Field(type = FieldType.Keyword)
private String address;
// ........
@Field(type = FieldType.Date, name = "timestamp")
private Long timestamp;
}
Tips:
- 該類具體內(nèi)容進行了脫敏
- 需要特別注意@Document的type一定要和es中的_type一致,否則查詢結(jié)果為是空
- 如果不需要保存文檔,可以不要@Field注解
通過ElasticsearchRestTemplate查詢
import com.xxx.entity.es.Eth0Logs;
import com.xxx.entity.es.QdnsLogs;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.aggregations.bucket.terms.ParsedStringTerms;
import org.elasticsearch.search.aggregations.bucket.terms.Terms;
import org.elasticsearch.search.aggregations.metrics.tophits.TopHits;
import org.elasticsearch.search.sort.SortOrder;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.data.elasticsearch.core.ElasticsearchRestTemplate;
import org.springframework.data.elasticsearch.core.aggregation.AggregatedPage;
import org.springframework.data.elasticsearch.core.query.*;
import javax.annotation.Resource;
import java.util.List;
/**
* 實現(xiàn)描述:
*
* @author Hecj
* @version v 1.0.0
* @since 2022/06/17
*/
@SpringBootTest(classes = Application.class)
public class EsTest {
@Resource(name = "elasticsearchRestTemplate")
private ElasticsearchRestTemplate elasticsearchRestTemplate;
@Resource(name = "zeekElasticsearchTemplate")
private ElasticsearchRestTemplate zeekElasticsearchTemplate;
/**
* 通過時間范圍和是否存在某一字段查詢
*/
@Test
void test1(){
SearchQuery searchQuery = new NativeSearchQueryBuilder()//查詢數(shù)據(jù),構(gòu)造出一個查詢
.withQuery(QueryBuilders.boolQuery().must(QueryBuilders.rangeQuery("timestamp").from(1654506000000L).to(1654507340785L)).must(QueryBuilders.existsQuery("name")))
.build();//構(gòu)造一個SearchQuery
List<QdnsLogs> list = elasticsearchRestTemplate.queryForList(searchQuery, QdnsLogs.class);
System.out.println(list.size());
}
/**
* 通過name值等于特定值
*/
@Test
void test2(){
SearchQuery searchQuery = new NativeSearchQueryBuilder()
.withQuery(QueryBuilders.termQuery("name", "zhangsan"))
.build();//構(gòu)造一個SearchQuery
List<Eth0Logs> eth0Logs = zeekElasticsearchTemplate.queryForList(searchQuery, Eth0Logs.class);
System.out.println(eth0Logs.size());
for (Eth0Logs log : eth0Logs) {
System.out.println(log.getTs());
}
}
/**
* aggs查詢
*
* 查詢指定時間范圍內(nèi)存在name值的記錄,進行通過name聚合,按照時間倒序排序取最新一條記錄
*/
@Test
void test3() {
SearchQuery searchQuery = new NativeSearchQueryBuilder()
.withQuery(QueryBuilders.boolQuery()
.must(
QueryBuilders.rangeQuery("timestamp")
.from(1654506000000L)
.to(1654507340785L)
)
.must(QueryBuilders.existsQuery("name")))
.addAggregation(AggregationBuilders.terms("name")
.field("name")
.size(10000)
.subAggregation(
AggregationBuilders.topHits("top")
.sort("timestamp", SortOrder.DESC)
.size(1)
)
)
.build();
AggregatedPage<QdnsLogs> logs = elasticsearchRestTemplate.queryForPage(searchQuery, QdnsLogs.class);
ParsedStringTerms fqdn = (ParsedStringTerms)logs.getAggregation("name");
List<? extends Terms.Bucket> buckets = fqdn.getBuckets();
for (Terms.Bucket entry : buckets) {
String key = entry.getKeyAsString();
TopHits topHits= entry.getAggregations().get("top");
SearchHits hits = topHits.getHits();
SearchHit at = hits.getAt(0);
System.out.println(key + "-" + at);
}
}
}
Tips: 部分包完整名稱進行了脫敏
通過JPA查詢
這里的接口不需要添加@Service,通過JPA方式需要特別注意書寫規(guī)范,字段名稱的正確性。
interface BookRepository extends Repository<Book, String> {
List<Book> findByNameAndPrice(String name, Integer price);
}
相當(dāng)于:
{
"query": {
"bool" : {
"must" : [
{ "query_string" : { "query" : "?", "fields" : [ "name" ] } },
{ "query_string" : { "query" : "?", "fields" : [ "price" ] } }
]
}
}
}
Table 2. Supported keywords inside method names
Keyword | Sample | Elasticsearch Query String |
---|---|---|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
保存文檔
import cn.hutool.core.collection.CollectionUtil;
import cn.hutool.core.util.IdUtil;
import org.springframework.data.elasticsearch.core.query.IndexQuery;
import java.util.List;
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
// 這里的dataList是需要保存到ES的bean集合,各位自行替換
List<IndexQuery> queries = dataList.stream().map(e -> {
IndexQuery query = new IndexQuery();
// 這個自行替換,也可以省略
query.setId(IdUtil.simpleUUID());
// 具體的數(shù)據(jù)
query.setObject(e);
// 索引名稱
query.setIndexName("demo-index-20220609");
// 索引類型
query.setType("log");
return query;
}).collect(Collectors.toList());
if(CollectionUtil.isNotEmpty(queries)){
zeekElasticsearchTemplate.bulkIndex(queries);
log.info("#~ 寫入日志成功,寫入條數(shù):{}", queries.size());
}
參考鏈接
Spring Data版本依賴矩陣文章來源:http://www.zghlxwxcb.cn/news/detail-416923.html
elasticsearch官方手冊文章來源地址http://www.zghlxwxcb.cn/news/detail-416923.html
到了這里,關(guān)于spring boot es | spring boot 整合elasticsearch | spring boot整合多數(shù)據(jù)源es的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!