Java ElasticSearch 操作
  TEZNKK3IfmPf 2023年11月14日 61 0

pom 文件中添加:

Java ElasticSearch 操作Java ElasticSearch 操作

<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-high-level-client</artifactId>
<version>6.3.2</version>
</dependency>

View Code

如果是SpringBoot工程(这里不是SpringBoot工程,是自己写的简单Demo),在pom文件中的<properties>标签中添加<elasticsearch.version>6.1.4</elasticsearch.version>,否则可能会导致ElasticSearch依赖包的版本不一致使程序无法正常运行。

注意版本是6.3.2,6.1.4版本不支持创建索引

Log2ESUtil代码:

Java ElasticSearch 操作Java ElasticSearch 操作

package com.sux.demo.utils;

import org.apache.http.HttpHost;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexResponse;
import org.elasticsearch.action.admin.indices.get.GetIndexRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestHighLevelClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;

public class Log2ESUtil {
private static final Logger log = LoggerFactory.getLogger(Log2ESUtil.class);

RestHighLevelClient client = null;

private final SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSZZ");

public void initES() {
try {
client = new RestHighLevelClient(
RestClient.builder(
new HttpHost("34.8.8.93", 24100, "http"),
new HttpHost("34.8.8.94", 24100, "http"),
new HttpHost("34.8.8.95", 24100, "http"),
new HttpHost("34.8.8.96", 24100, "http"),
new HttpHost("34.8.8.98", 24100, "http"),
new HttpHost("34.8.8.99", 24100, "http"))
.setMaxRetryTimeoutMillis(5 * 60 * 1000));//超时时间设为5分钟
log.info("Log2ESService init 成功");
} catch (Exception e) {
log.error("Log2ESService init 失败", e);
}
}

public void closeES() {
try {
client.close();
log.info("Log2ESService close 成功");
} catch (Exception e) {
log.error("Log2ESService close 失败", e);
}
}

public void log2ES(boolean success, String index, String app, String msg) throws Exception {
try {
String doc = "doc";
String id = UUID.randomUUID().toString();

//保存到ES的数据
Map<String, Object> jsonMap = new HashMap<>();
jsonMap.put("app", app);
if (success) {
jsonMap.put("operation_result", "成功");
} else {
jsonMap.put("operation_result", "失败");
}
jsonMap.put("message", msg);
jsonMap.put("log_time", simpleDateFormat.format(new Date()));

IndexRequest indexRequest = new IndexRequest(index, doc, id)
.source(jsonMap);
client.index(indexRequest);

//log.info("Log2ESService log2ES 成功,数据:" + jsonMap.toString());
} catch (Exception e) {
log.error("Log2ESService log2ES 失败", e);
}
}

public boolean indexExists(String indexName) throws IOException {
GetIndexRequest getIndexRequest = new GetIndexRequest();
getIndexRequest.indices(indexName);
return client.indices().exists(getIndexRequest);
}

public boolean createIndex(String indexName) throws IOException {
CreateIndexRequest createIndexRequest = new CreateIndexRequest(indexName);

// 配置映射关系
Map<String, Object> mappings = new HashMap<>();

Map<String, Object> type = new HashMap<>();
mappings.put("doc", type);
type.put("dynamic", false); //说明:

Map<String, Object> properties = new HashMap<>();
type.put("properties", properties);

//文档的id映射
Map<String, Object> idProperties = new HashMap<>();
idProperties.put("type", "integer");
idProperties.put("store", "true");
properties.put("id", idProperties);

// 文档的其他字段映射
Map<String, Object> moreProperties = new HashMap<>();
moreProperties.put("type", "text"); //说明:
moreProperties.put("store", "true"); //说明:
properties.put("app", moreProperties);

moreProperties = new HashMap<>();
moreProperties.put("type", "text");
moreProperties.put("store", "true");
properties.put("operation_result", moreProperties);

moreProperties = new HashMap<>();
moreProperties.put("type", "text");
moreProperties.put("store", "true");
properties.put("message", moreProperties);

moreProperties = new HashMap<>();
moreProperties.put("type", "date");
moreProperties.put("store", "true");
moreProperties.put("format", "yyyy-MM-dd'T'HH:mm:ss.SSSZZ");
properties.put("log_time", moreProperties);

createIndexRequest.mapping("doc", mappings);

CreateIndexResponse createIndexResponse = client.indices().create(createIndexRequest);
return createIndexResponse.isAcknowledged();
}

public boolean deleteIndex(String indexName) throws IOException {
DeleteIndexRequest deleteIndexRequest = new DeleteIndexRequest(indexName);
DeleteIndexResponse deleteIndexResponse = client.indices().delete(deleteIndexRequest);
return deleteIndexResponse.isAcknowledged();
}

}

View Code

测试代码:

创建索引:

Java ElasticSearch 操作Java ElasticSearch 操作

package com.sux.demo;

import com.sux.demo.utils.Log2ESUtil;
import org.apache.log4j.PropertyConfigurator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;

public class TestES_CreateIndex {
private static final Logger log = LoggerFactory.getLogger(TestES_CreateIndex.class);

private static Log2ESUtil log2ESUtil = new Log2ESUtil();

private static String indexName = "sux-test";

private static String app = "sux-test";

public static void main(String[] args) throws Exception {
try {
PropertyConfigurator.configure("src/main/resources/log4j.properties");

log2ESUtil.initES();

createIndex();

log2ESUtil.closeES();
} catch (Exception e) {
log.error("TestES_CreateIndex 出错", e);
}
}

private static void createIndex() throws IOException {
if (!log2ESUtil.indexExists(indexName)) {
boolean result = log2ESUtil.createIndex(indexName);
if (result) {
log.info("创建索引" + indexName + "成功!");
} else {
log.info("创建索引" + indexName + "失败!");
}
} else {
System.out.println("索引" + indexName + "已存在,不需要创建");
}
}
}

View Code

删除索引:

Java ElasticSearch 操作Java ElasticSearch 操作

package com.sux.demo;

import com.sux.demo.utils.Log2ESUtil;
import org.apache.log4j.PropertyConfigurator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;

public class TestES_DeleteIndex {
private static final Logger log = LoggerFactory.getLogger(TestES_DeleteIndex.class);

private static Log2ESUtil log2ESUtil = new Log2ESUtil();

private static String indexName = "sux-test";

public static void main(String[] args) throws Exception {
try {
PropertyConfigurator.configure("src/main/resources/log4j.properties");

log2ESUtil.initES();

if (log2ESUtil.indexExists(indexName)) {
boolean result = log2ESUtil.deleteIndex(indexName);
if (result) {
log.info("删除索引" + indexName + "成功!");
} else {
log.info("删除索引" + indexName + "失败!");
}
} else {
log.info("索引" + indexName + "不存在,不需要删除!");
}

log2ESUtil.closeES();
} catch (Exception e) {
log.error("TestES_DeleteIndex 出错", e);
}
}

}

View Code

单线程数据写入:

Java ElasticSearch 操作Java ElasticSearch 操作

package com.sux.demo;

import com.sux.demo.utils.Log2ESUtil;
import com.sux.demo.utils.Speed;
import org.apache.log4j.PropertyConfigurator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Date;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;

public class TestES_SingleInsert {
private static final Logger log = LoggerFactory.getLogger(TestES_Insert.class);

private static Log2ESUtil log2ESUtil = new Log2ESUtil();

private static String indexName = "sux-test";

private static String app = "sux-test";

public static void main(String[] args) throws Exception {
try {
PropertyConfigurator.configure("src/main/resources/log4j.properties");

log2ESUtil.initES();

long startTime = System.currentTimeMillis();

int n = 200;
for (int i = 1; i <= n; i++) {
log2ESUtil.log2ES(true, indexName, app, "单线程插入数据" + i);
if (i % 50 == 0) {
log.info("count=" + i);
}
Speed.addCount();
}

long endTime = System.currentTimeMillis();

double speed = Speed.getCount() / (double) ((endTime - startTime) / 1000.0);
System.out.println(" 数据插入速度:" + (int) speed + " 条/秒");

log2ESUtil.closeES();
} catch (Exception e) {
log.error("TestES_SingleInsert 出错", e);
}
}
}

View Code

多线程数据写入:

Java ElasticSearch 操作Java ElasticSearch 操作

package com.sux.demo;

import com.sux.demo.utils.Log2ESUtil;
import com.sux.demo.utils.Speed;
import org.apache.log4j.PropertyConfigurator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;

public class TestES_Insert {
private static final Logger log = LoggerFactory.getLogger(TestES_Insert.class);

private static Log2ESUtil log2ESUtil = new Log2ESUtil();

private static String indexName = "sux-test";

private static String app = "sux-test";

private static ThreadPoolExecutor threadPool = (ThreadPoolExecutor) Executors.newFixedThreadPool(50);

public static void main(String[] args) throws Exception {
try {
PropertyConfigurator.configure("src/main/resources/log4j.properties");

log2ESUtil.initES();

long startTime = System.currentTimeMillis();

int n = 10000;
CountDownLatch countDownLatch = new CountDownLatch(n);
for (int i = 1; i <= n; i++) {
ESInsertRunnable esInsertRunnable = new ESInsertRunnable(countDownLatch, log2ESUtil, i, "多线程插入数据");
threadPool.submit(esInsertRunnable);
}

try {
countDownLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}

long endTime = System.currentTimeMillis();

double speed = Speed.getCount() / (double) ((endTime - startTime) / 1000.0);
System.out.println(" 数据插入速度:" + (int) speed + " 条/秒");

log2ESUtil.closeES();
threadPool.shutdown();
} catch (Exception e) {
log.error("TestES_Insert 出错", e);
}
}
}

class ESInsertRunnable implements Runnable {
private static final Logger log = LoggerFactory.getLogger(ESInsertRunnable.class);

private CountDownLatch countDownLatch;

private Log2ESUtil log2ESUtil;

private int num;

private static String indexName = "sux-test";

private static String app = "sux-test";

private String msg;

public ESInsertRunnable(CountDownLatch countDownLatch, Log2ESUtil log2ESUtil, int num, String msg) {
this.countDownLatch = countDownLatch;
this.log2ESUtil = log2ESUtil;
this.num = num;
this.msg = msg;
}

public void run() {
try {
log2ESUtil.log2ES(true, indexName, app, msg + num);
if (countDownLatch.getCount() % 500 == 0) {
log.info("count=" + countDownLatch.getCount());
}
Speed.addCount();
} catch (Exception e) {
log.error("TestES_Insert 异常", e);
}

countDownLatch.countDown();
}
}

View Code

实际测试性能(使用现网es集群测试):

单线程:20条/秒

线程池(50个线程):500条/秒

【版权声明】本文内容来自摩杜云社区用户原创、第三方投稿、转载,内容版权归原作者所有。本网站的目的在于传递更多信息,不拥有版权,亦不承担相应法律责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@moduyun.com

  1. 分享:
最后一次编辑于 2023年11月14日 0

暂无评论

推荐阅读
TEZNKK3IfmPf