elasticsearch+logstash并使用java代碼實現(xiàn)日志檢索
為了項目日志不被泄露,數(shù)據(jù)展示不采用Kibana
1、環(huán)境準備
1.1 創(chuàng)建普通用戶
#創(chuàng)建用戶 useradd querylog #設置密碼 passwd queylog #授權sudo權限 查找sudoers文件位置 whereis sudoers #修改文件為可編輯 chmod -v u+w /etc/sudoers #編輯文件 vi /etc/sudoers #收回權限 chmod -v u-w /etc/sudoers #第一次使用sudo會有提示 We trust you have received the usual lecture from the local System Administrator. It usually boils down to these three things: #1) Respect the privacy of others. #2) Think before you type. #3) With great power comes great responsibility. 用戶創(chuàng)建完成。
1.2 安裝jdk
su queylog cd /home/queylog #解壓jdk-8u191-linux-x64.tar.gz tar -zxvf jdk-8u191-linux-x64.tar.gz sudo mv jdk1.8.0_191 /opt/jdk1.8 #編輯/ect/profile vi /ect/profile export JAVA_HOME=/opt/jdk1.8 export JRE_HOME=$JAVA_HOME/jre export CLASSPATH=.:$JAVA_HOME/lib:$JRE_HOME/lib:$CLASSPATH export PATH=$JAVA_HOME/bin:$JRE_HOME/bin:$PATH #刷新配置文件 source /ect/profile #查看jdk版本 java -verion
1.3 防火墻設置
#放行指定IP firewall-cmd --permanent --add-rich-rule="rule family="ipv4" source address="172.16.110.55" accept" #重新載入 firewall-cmd --reload
2、安裝elasticsearch
2.1 elasticsearch配置
注意:elasticsearch要使用普通用戶啟動要不然會報錯
su queylog
cd /home/queylog
#解壓elasticsearch-6.5.4.tar.gz
tar -zxvf elasticsearch-6.5.4.tar.gz
sudo mv elasticsearch-6.5.4 /opt/elasticsearch
#編輯es配置文件
vi /opt/elasticsearch/config/elasticsearch.yml
# 配置es的集群名稱
cluster.name: elastic
# 修改服務地址
network.host: 192.168.8.224
# 修改服務端口
http.port: 9200
#切換root用戶
su root
#修改/etc/security/limits.conf 追加以下內容
vi /etc/security/limits.conf
* hard nofile 655360
* soft nofile 131072
* hard nproc 4096
* soft nproc 2048
#編輯 /etc/sysctl.conf,追加以下內容:
vi /etc/sysctl.conf
vm.max_map_count=655360
fs.file-max=655360
#保存后,重新加載:
sysctl -p
#切換回普通用戶
su queylog
#啟動elasticsearch
./opt/elasticsearch/bin/elasticsearch
#測試
curl http://192.168.8.224:9200
#控制臺會打印
{
"name" : "L_dA6oi",
"cluster_name" : "elasticsearch",
"cluster_uuid" : "eS7yP6fVTvC8KMhLutOz6w",
"version" : {
"number" : "6.5.4",
"build_flavor" : "default",
"build_type" : "tar",
"build_hash" : "d2ef93d",
"build_date" : "2018-12-17T21:17:40.758843Z",
"build_snapshot" : false,
"lucene_version" : "7.5.0",
"minimum_wire_compatibility_version" : "5.6.0",
"minimum_index_compatibility_version" : "5.0.0"
},
"tagline" : "You Know, for Search"
}
2.2 把elasticsearch作為服務進行管理
#切換root用戶
su root
#編寫服務配置文件
vi /usr/lib/systemd/system/elasticsearch.service
[unit]
Description=Elasticsearch
Documentation=http://www.elastic.co
Wants=network-online.target
After=network-online.target
[Service]
Environment=ES_HOME=/opt/elasticsearch
Environment=ES_PATH_CONF=/opt/elasticsearch/config
Environment=PID_DIR=/opt/elasticsearch/config
EnvironmentFile=/etc/sysconfig/elasticsearch
WorkingDirectory=/opt/elasticsearch
User=queylog
Group=queylog
ExecStart=/opt/elasticsearch/bin/elasticsearch -p ${PID_DIR}/elasticsearch.pid
# StandardOutput is configured to redirect to journalctl since
# some error messages may be logged in standard output before
# elasticsearch logging system is initialized. Elasticsearch
# stores its logs in /var/log/elasticsearch and does not use
# journalctl by default. If you also want to enable journalctl
# logging, you can simply remove the "quiet" option from ExecStart.
StandardOutput=journal
StandardError=inherit
# Specifies the maximum file descriptor number that can be opened by this process
LimitNOFILE=65536
# Specifies the maximum number of process
LimitNPROC=4096
# Specifies the maximum size of virtual memory
LimitAS=infinity
# Specifies the maximum file size
LimitFSIZE=infinity
# Disable timeout logic and wait until process is stopped
TimeoutStopSec=0
# SIGTERM signal is used to stop the Java process
KillSignal=SIGTERM
# Send the signal only to the JVM rather than its control group
KillMode=process
# Java process is never killed
SendSIGKILL=no
# When a JVM receives a SIGTERM signal it exits with code 143
SuccessExitStatus=143
[Install]
WantedBy=multi-user.target
vi /etc/sysconfig/elasticsearch
elasticsearch #
#######################
# Elasticsearch home directory
ES_HOME=/opt/elasticsearch
# Elasticsearch Java path
JAVA_HOME=/home/liyijie/jdk1.8
CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar:$JAVA_HOMR/jre/lib
# Elasticsearch configuration directory
ES_PATH_CONF=/opt/elasticsearch/config
# Elasticsearch PID directory
PID_DIR=/opt/elasticsearch/config
#############################
# Elasticsearch Service #
#############################
# SysV init.d
# The number of seconds to wait before checking if elasticsearch started successfully as a daemon process
ES_STARTUP_SLEEP_TIME=5
################################
# Elasticsearch Properties #
################################
# Specifies the maximum file descriptor number that can be opened by this process
# When using Systemd,this setting is ignored and the LimitNOFILE defined in
# /usr/lib/systemd/system/elasticsearch.service takes precedence
#MAX_OPEN_FILES=65536
# The maximum number of bytes of memory that may be locked into RAM
# Set to "unlimited" if you use the 'bootstrap.memory_lock: true' option
# in elasticsearch.yml.
# When using Systemd,LimitMEMLOCK must be set in a unit file such as
# /etc/systemd/system/elasticsearch.service.d/override.conf.
#MAX_LOCKED_MEMORY=unlimited
# Maximum number of VMA(Virtual Memory Areas) a process can own
# When using Systemd,this setting is ignored and the 'vm.max_map_count'
# property is set at boot time in /usr/lib/sysctl.d/elasticsearch.conf
#MAX_MAP_COUNT=262144
# 重新加載服務
systemctl daemon-reload
#切換普通用戶
su queylog
#啟動elasticsearch
sudo systemctl start elasticsearch
#設置開機自啟動
sudo systemctl enable elasticsearch
3、安裝logstash
3.1、logstash配置
su queylog
cd /home/queylog
#解壓 logstash-6.5.4.tar.gz
tar -zxvf logstash-6.5.4.tar.gz
sudo mv logstash-6.5.4 /opt/logstash
#編輯es配置文件
vi /opt/logstash/config/logstash.yml
xpack.monitoring.enabled: true
xpack.monitoring.elasticsearch.username: elastic
xpack.monitoring.elasticsearch.password: changeme
xpack.monitoring.elasticsearch.url: ["http://192.168.8.224:9200"]
#在bin目錄下創(chuàng)建logstash.conf
vi /opt/logstash/bin/logstash.conf
input {
# 以文件作為來源
file {
# 日志文件路徑
path => "/opt/tomcat/logs/catalina.out"
start_position => "beginning" # (end, beginning)
type=> "isp"
}
}
#filter {
#定義數(shù)據(jù)的格式,正則解析日志(根據(jù)實際需要對日志日志過濾、收集)
#grok {
# match => { "message" => "%{IPV4:clientIP}|%{GREEDYDATA:request}|%{NUMBER:duration}"}
#}
#根據(jù)需要對數(shù)據(jù)的類型轉換
#mutate { convert => { "duration" => "integer" }}
#}
# 定義輸出
output {
elasticsearch {
hosts => "192.168.43.211:9200" #Elasticsearch 默認端口
index => "ind"
document_type => "isp"
}
}
#給該用戶授權
chown queylog:queylog /opt/logstash
#啟動logstash
./opt/logstash/bin/logstash -f logstash.conf
# 安裝并配置啟動logstash后查看es索引是否創(chuàng)建完成
curl http://192.168.8.224:9200/_cat/indices
4、java代碼部分
之前在SpringBoot整合ElasticSearch與Redis的異常解決
查閱資料,這個歸納的原因比較合理。
原因分析:程序的其他地方使用了Netty,這里指redis。這影響在實例化傳輸客戶端之前初始化處理器的數(shù)量。 實例化傳輸客戶端時,我們嘗試初始化處理器的數(shù)量。 由于在其他地方使用Netty,因此已經(jīng)初始化并且Netty會對此進行防范,因此首次實例化會因看到的非法狀態(tài)異常而失敗。
解決方案
在SpringBoot啟動類中加入:
System.setProperty("es.set.netty.runtime.available.processors", "false");
4.1、引入pom依賴
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-elasticsearch</artifactId> </dependency>
4.2、修改配置文件
spring.data.elasticsearch.cluster-name=elastic # restapi使用9200 # java程序使用9300 spring.data.elasticsearch.cluster-nodes=192.168.43.211:9300
4.3、對應的接口以及實現(xiàn)類
import org.springframework.data.elasticsearch.annotations.Document;
import org.springframework.data.elasticsearch.annotations.Field;
@Document(indexName = "ind", type = "isp")
public class Bean {
@Field
private String message;
public String getMessage() {
return message;
}
public void setMessage(String message) {
this.message = message;
}
@Override
public String toString() {
return "Tomcat{" +
", message='" + message + '\'' +
'}';
}
}
import java.util.Map;
public interface IElasticSearchService {
Map<String, Object> search(String keywords, Integer currentPage, Integer pageSize) throws Exception ;
//特殊字符轉義
default String escape( String s) {
StringBuilder sb = new StringBuilder();
for(int i = 0; i < s.length(); ++i) {
char c = s.charAt(i);
if (c == '\\' || c == '+' || c == '-' || c == '!' || c == '(' || c == ')' || c == ':' || c == '^' || c == '[' || c == ']' || c == '"' || c == '{' || c == '}' || c == '~' || c == '*' || c == '?' || c == '|' || c == '&' || c == '/') {
sb.append('\\');
}
sb.append(c);
}
return sb.toString();
}
}
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.fetch.subphase.highlight.HighlightBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.domain.PageRequest;
import org.springframework.data.elasticsearch.core.ElasticsearchTemplate;
import org.springframework.data.elasticsearch.core.aggregation.AggregatedPage;
import org.springframework.data.elasticsearch.core.query.NativeSearchQueryBuilder;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* ElasticSearch實現(xiàn)類
*/
@Service
public class ElasticSearchServiceImpl implements IElasticSearchService {
Logger log = LoggerFactory.getLogger(ElasticSearchServiceImpl.class);
@Autowired
ElasticsearchTemplate elasticsearchTemplate;
@Resource
HighlightResultHelper highlightResultHelper;
@Override
public Map<String, Object> search(String keywords, Integer currentPage, Integer pageSize) {
keywords= escape(keywords);
currentPage = Math.max(currentPage - 1, 0);
List<HighlightBuilder.Field> highlightFields = new ArrayList<>();
//設置高亮 把查詢到的關鍵字進行高亮
HighlightBuilder.Field message = new HighlightBuilder.Field("message").fragmentOffset(80000).numOfFragments(0).requireFieldMatch(false).preTags("<span style='color:red'>").postTags("</span>");
highlightFields.add(message);
HighlightBuilder.Field[] highlightFieldsAry = highlightFields.toArray(new HighlightBuilder
.Field[highlightFields.size()]);
//創(chuàng)建查詢構造器
NativeSearchQueryBuilder queryBuilder = new NativeSearchQueryBuilder();
//過濾 按字段權重進行搜索 查詢內容不為空按關鍵字、摘要、其他屬性權重
BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
queryBuilder.withPageable(PageRequest.of(currentPage, pageSize));
if (!MyStringUtils.isEmpty(keywords)){
boolQueryBuilder.must(QueryBuilders.queryStringQuery(keywords).field("message"));
}
queryBuilder.withQuery(boolQueryBuilder);
queryBuilder.withHighlightFields(highlightFieldsAry);
log.info("查詢語句:{}", queryBuilder.build().getQuery().toString());
//查詢
AggregatedPage<Bean> result = elasticsearchTemplate.queryForPage(queryBuilder.build(), Bean
.class,highlightResultHelper);
//解析結果
long total = result.getTotalElements();
int totalPage = result.getTotalPages();
List<Bean> blogList = result.getContent();
Map<String, Object> map = new HashMap<>();
map.put("total", total);
map.put("totalPage", totalPage);
map.put("pageSize", pageSize);
map.put("currentPage", currentPage + 1);
map.put("blogList", blogList);
return map;
}
import com.alibaba.fastjson.JSONObject;
import org.apache.commons.beanutils.PropertyUtils;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.common.text.Text;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.fetch.subphase.highlight.HighlightField;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.data.domain.Pageable;
import org.springframework.data.elasticsearch.core.SearchResultMapper;
import org.springframework.data.elasticsearch.core.aggregation.AggregatedPage;
import org.springframework.data.elasticsearch.core.aggregation.impl.AggregatedPageImpl;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;
import java.lang.reflect.InvocationTargetException;
import java.util.ArrayList;
import java.util.List;
/**
* ElasticSearch高亮配置
*/
@Component
public class HighlightResultHelper implements SearchResultMapper {
Logger log = LoggerFactory.getLogger(HighlightResultHelper.class);
@Override
public <T> AggregatedPage<T> mapResults(SearchResponse response, Class<T> clazz, Pageable pageable) {
List<T> results = new ArrayList<>();
for (SearchHit hit : response.getHits()) {
if (hit != null) {
T result = null;
if (StringUtils.hasText(hit.getSourceAsString())) {
result = JSONObject.parseObject(hit.getSourceAsString(), clazz);
}
// 高亮查詢
for (HighlightField field : hit.getHighlightFields().values()) {
try {
PropertyUtils.setProperty(result, field.getName(), concat(field.fragments()));
} catch (IllegalAccessException | InvocationTargetException | NoSuchMethodException e) {
log.error("設置高亮字段異常:{}", e.getMessage(), e);
}
}
results.add(result);
}
}
return new AggregatedPageImpl<T>(results, pageable, response.getHits().getTotalHits(), response
.getAggregations(), response.getScrollId());
}
public <T> T mapSearchHit(SearchHit searchHit, Class<T> clazz) {
List<T> results = new ArrayList<>();
for (HighlightField field : searchHit.getHighlightFields().values()) {
T result = null;
if (StringUtils.hasText(searchHit.getSourceAsString())) {
result = JSONObject.parseObject(searchHit.getSourceAsString(), clazz);
}
try {
PropertyUtils.setProperty(result, field.getName(), concat(field.fragments()));
} catch (IllegalAccessException | InvocationTargetException | NoSuchMethodException e) {
log.error("設置高亮字段異常:{}", e.getMessage(), e);
}
results.add(result);
}
return null;
}
private String concat(Text[] texts) {
StringBuffer sb = new StringBuffer();
for (Text text : texts) {
sb.append(text.toString());
}
return sb.toString();
}
}
import org.junit.Test;
import org.junit.runner.RunWith;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
@RunWith(SpringRunner.class)
@SpringBootTest(classes = CbeiIspApplication.class)
public class ElasticSearchServiceTest {w
private static Logger logger= LoggerFactory.getLogger(EncodePhoneAndCardTest.class);
@Autowired
private IElasticSearchService elasticSearchService;
@Test
public ResponseVO getLog(){
try {
Map<String, Object> search = elasticSearchService.search("Exception", 1, 10);
logger.info( JSON.toJSONString(search));
} catch (Exception e) {
e.printStackTrace();
}
}
例如:以上就是今天要講的內容,本文僅僅簡單介紹了elasticsearch跟logstash的使用, 文章若有不當之處,歡迎評論指出~
到此這篇關于elasticsearch+logstash并使用java代碼實現(xiàn)日志檢索的文章就介紹到這了,更多相關elasticsearch logstash日志檢索內容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持腳本之家!

