浏览量 4193
2020/04/25 13:22
TSF投递配置
投递路径
tsf ckafka logstash es
TSF投递到kafka的日志
{"cluster-id":"cls-*","application-id":"application-****","message":"{\"log_time\":\"2020-04-25 11:33:07.178\",\"logType\":\"app_java\",\"level\":\"INFO\",\"thread\":\"tsf-schedule-0-T3\",\"trace\":\"[,,,]\",\"skywalkingTraceId\":\"TID: N/A\",\"class_name\":\"*.*.rule.TsfCircuitBreakerConsulKVLoader\",\"method_name\":\"syncCircuitBreakerRule\",\"code_line\":63,\"message\":\"[TSF CIRCUIT BREAKER LOADER] TSF circuit breaker loader start NEW round, circuitBreakerRuleIndex: 8295189\"}","namespace-id":"namespace-***","instance-id":"**-**-*-online-***-5lddz","offset":86002,"type":"log","app-id":"*****","local-ip":"*.*.*.*","fields":{"topicname":"app-log"},"appgroup-id":"group-*","@version":"1","@timestamp":"2020-04-25T03:33:07.715Z"}
安装openjdk
yum install java-1.8.0-openjdk
安装logstash
cd /usr/local
nohup wget https://artifacts.elastic.co/downloads/logstash/logstash-7.6.2.zip &
unzip logstash-7.6.2.zip
配置logstash.conf:
1.通过instance_id字段将kafka中的日志输出到不同es索引。方便后面kibana建立index patterns。
2.通过log_time是否包含,排除异常输出。
3.将异常输出通过增加level:EXCEPTION字段,方便后续查询分别。
[root@VM_10_41_centos config]# pwd
/usr/local/logstash-7.6.2/config
[root@VM_10_41_centos config]# cat logstash.conf
# Sample Logstash configuration for creating a simple
# Beats -> Logstash -> Elasticsearch pipeline.
input
{
kafka
{
bootstrap_servers = > "10.1.1.1:9092"
topics = > ["app-log"]
group_id = > "app-log-kafka"
codec = > json
{charset = > "UTF-8"}
}
}
filter
{
if "log_time" in [message]{
json
{
source = > "message"
}
}
else{
mutate{
add_field => {"level"=>"EXCEPTION"}
}
}
output
{
file
{
path = > "/usr/local/logstash-7.6.2/tmp/app.log"
flush_interval = > 0
}
elasticsearch
{
hosts = > ["http://10.1.1.2:9200"]
index => "%{instance-id}-log-%{+YYYY.MM.dd}"}
}
投递效果
message字段非json,通过logstash增加leve:"EXCEPTION"查询效果如下:
message是json,通过含有log_time判断,解析效果如下:
上一篇 搜索 下一篇