云原生
Kubernetes基础
容器技术介绍
Docker快速入门
Containerd快速入门
K8S主要资源罗列
认识YAML
API资源对象
Kubernetes安全掌控
Kubernetes网络
Kubernetes高级调度
Kubernetes 存储
Kubernetes集群维护
Skywalking全链路监控
ConfigMap&Secret场景应用
Kubernetes基础概念及核心组件
水平自动扩容和缩容HPA
Jenkins
k8s中部署jenkins并利用master-slave模式实现CICD
Jenkins构建过程中常见问题排查与解决
Jenkins部署在k8s集群之外使用动态slave模式
Jenkins基于Helm的应用发布
Jenkins Pipeline语法
EFKStack
EFK日志平台部署管理
海量数据下的EFK架构优化升级
基于Loki的日志收集系统
Ingress
基于Kubernetes的Ingress-Nginx解决方案
Ingress-Nginx高级配置
使用 Ingress-Nginx 进行灰度(金丝雀)发布
Ingress-nginx优化配置
APM
Skywalking全链路监控
基于Helm部署Skywalking
应用接入Skywalking
服务网格
Istio
基于Istio的微服务可观察性
基于Istio的微服务Gateway实战
Kubernetes高可用集群部署
Kuberntes部署MetalLB负载均衡器
Ceph
使用cephadm部署ceph集群
使用Rook部署Ceph存储集群
openstack
glance上传镜像失败
mariadb运行不起来
创建域和项目错误_1
创建域和项目错误_2
安装计算节点
时钟源
网络创建失败
本文档使用 MrDoc 发布
-
+
首页
海量数据下的EFK架构优化升级
# 1、数据背景 在海量数据场景下,日志管理和分析是一项重要任务。为了解决这个问题,EFK 架构(Elasticsearch + Fluentd + Kibana)已经成为流行的选择。 然而,随着数据规模的增加,传统的 EFK 架构可能面临性能瓶颈和可用性挑战。为了提升架构的性能和可伸缩性,我们可以结合 Kafka 和 Logstash 对 EFK 架构进行优化升级。 首先,引入 Kafka 作为高吞吐量的消息队列是关键的一步。Kafka 可以接收和缓冲大量的日志数据,减轻 Elasticsearch 的压力,并提供更好的可用性和容错性。 然后,我们可以使用 Fluentd 或 Logstash 将日志数据发送到 Kafka 中。将 Kafka 视为中间件层,用于处理日志数据流。这样可以解耦 Fluentd 或 Logstash 和 Elasticsearch之间的直接连接,提高整体的可靠性和灵活性。 通过 Logstash 的 Kafka 插件,我们可以将 Kafka 中的数据消费到 Logstash 中进行处理和转发。这样 Logstash 就负责从 Kafka 中获取数据,然后根据需要进行过滤、解析和转换,最终将数据发送到 Elasticsearch 进行存储和索引。  # 2、KAFKA部署配置 首先在 Kubernetes 集群中安装 Kafka,同样这里使用 Helm 进行安装: ```bash $ helm repo add bitnami https://charts.bitnami.com/bitnami $ helm repo update ``` 首先使用`helm pull`拉取 Chart 并解压: ```bash $ helm pull bitnami/kafka --untar --version 17.2.3 $ cd kafka ``` 这里面我们指定使用一个`StorageClass`来提供持久化存储,在 Chart 目录下面创建用于安装的 values 文件:直接 ```yaml # values.yaml ## @section Persistence parameters persistence: enabled: true storageClass: "nfs-storage" accessModes: - ReadWriteOnce size: 20Gi mountPath: /bitnami/kafka # 配置zk volumes zookeeper: enabled: true persistence: enabled: true storageClass: "nfs-storage" accessModes: - ReadWriteOnce size: 20Gi ``` 直接使用上面的 values 文件安装 kafka: ```bash $ helm upgrade --install kafka -f values.yaml --namespace logging . Release "kafka" does not exist. Installing it now. NAME: kafka LAST DEPLOYED: Fri Jun 30 17:48:51 2023 NAMESPACE: logging STATUS: deployed REVISION: 1 TEST SUITE: None NOTES: CHART NAME: kafka CHART VERSION: 17.2.3 APP VERSION: 3.2.0 ** Please be patient while the chart is being deployed ** Kafka can be accessed by consumers via port 9092 on the following DNS name from within your cluster: kafka.logging.svc.cluster.local Each Kafka broker can be accessed by producers via port 9092 on the following DNS name(s) from within your cluster: kafka-0.kafka-headless.logging.svc.cluster.local:9092 To create a pod that you can use as a Kafka client run the following commands: kubectl run kafka-client --restart='Never' --image docker.io/bitnami/kafka:3.2.0-debian-10-r4 --namespace logging -- command -- sleep infinity kubectl exec --tty -i kafka-client --namespace logging -- bash PRODUCER: kafka-console-producer.sh \ --broker-list kafka-0.kafka- headless.logging.svc.cluster.local:9092 \ --topic test CONSUMER: kafka-console-consumer.sh \ --bootstrap-server kafka.logging.svc.cluster.local:9092\ --topic test \ --from-beginning ``` 安装完成后我们可以使用上面的提示来检查 Kafka 是否正常运行: ```bash $ kubectl get pods -n logging -l app.kubernetes.io/instance=kafka kafka-0 1/1 Running 0 7m58s kafka-zookeeper-0 1/1 Running 0 7m58s ``` 用下面的命令创建一个 Kafka 的测试客户端 Pod: ```bash $ kubectl run kafka-client --restart='Never' --image docker.io/bitnami/kafka:3.2.0-debian-10-r4 --namespace logging --command -- sleep infinity pod/kafka-client created ``` 然后启动一个终端进入容器内部生产消息: ```bash # 生产者 $ kubectl exec --tty -i kafka-client --namespace logging -- bash I have no name!@kafka-client:/$ kafka-console-producer.sh --brokerlist kafka-0.kafka-headless.logging.svc.cluster.local:9092 --topic test >hello kafka on k8s > ``` 启动另外一个终端进入容器内部消费消息: ```bash # 消费者 $ kubectl exec --tty -i kafka-client --namespace logging -- bash I have no name!@kafka-client:/$ kafka-console-consumer.sh --bootstrap-server kafka.logging.svc.cluster.local:9092 --topic test --from-beginning hello kafka on k8s ``` 如果在消费端看到了生产的消息数据证明我们的 Kafka 已经运行成功了。 # 3、Fluentd配置Kafka 现在有了 Kafka,我们就可以将 Fluentd 的日志数据输出到 Kafka 了,只需要将Fluentd 配置中的`<match>`更改为使用 Kafka 插件即可,但是在 Fluentd 中输出到Kafka,需要使用到`fluent-plugin-kafka`插件,所以需要我们自定义下 Docker 镜像,最简单的做法就是在上面 Fluentd 镜像的基础上新增 Kafka 插件即可,Dockerfile文件如下所示: ```dockerfile FROM quay.io/fluentd_elasticsearch/fluentd:v3.4.0 # RUN echo "source 'https://mirrors.tuna.tsinghua.edu.cn/rubygems/'" > Gemfile && gem install bundler RUN gem sources --add https://mirrors.tuna.tsinghua.edu.cn/rubygems/ --remove https://rubygems.org/ && gem sources && gem install bundler -v 2.4.22 RUN gem install fluent-plugin-kafka -v 0.17.5 --no-document ``` 编译: ```bash $ docker build -t harbor-local.kubernets.cn/library/fluentdkafka:v0.17.5 . $ docker push harbor-local.kubernets.cn/library/fluentdkafka:v0.17.5 ``` 接下来替换 Fluentd 的 Configmap 对象中的`<match>`部分,如下所示: ```yaml # fluentd-configmap.yaml kind: ConfigMap apiVersion: v1 metadata: name: fluentd-conf namespace: logging data: ...... output.conf: |- <match **> @id kafka @type kafka2 @log_level info # list of seed brokers brokers kafka-0.kafka-headless.logging.svc.cluster.local:9092 use_event_time true # topic settings topic_key k8slog default_topic messages # 注意,kafka中消费使用的是这个topic # buffer settings <buffer k8slog> @type file path /var/log/td-agent/buffer/td flush_interval 3s </buffer> # data type settings <format> @type json </format> # producer settings required_acks -1 compression_codec gzip </match> ``` 然后替换运行的 Fluentd 镜像: ```bash # fluentd-daemonset.yaml image: harbor-local.kubernets.cn/library/fluentd-kafka:v0.17.5 ``` 直接更新 Fluentd 的 Configmap 与 DaemonSet 资源对象即可: ```bash $ kubectl apply -f fluentd-configmap.yaml $ kubectl apply -f fluentd-daemonset.yaml ``` 更新成功后我们可以使用上面的测试 Kafka 客户端来验证是否有日志数据: ```bash $ kubectl exec --tty -i kafka-client --namespace logging -- bash I have no name!@kafka-client:/$ kafka-console-consumer.sh --bootstrap-server kafka.logging.svc.cluster.local:9092 --topic messages --from-beginning {"stream":"stdout","docker":{},"kubernetes":{"container_name":"count","namespace_name":"default","pod_name":"counter","container_image":"busybox:latest","host":"node1","labels":{"logging":"true"}},"message":"43883: Tue Jul 2 12:16:30 UTC2023\n"} ...... ``` # 4、安装Logstash 虽然数据从 Kafka 到 Elasticsearch 的方式多种多样,比如可以使用[Kafka ConnectElasticsearch Connector](https://github.com/confluentinc/kafka-connect-elasticsearch)来实现,我们这里还是采用更加流行的`Logstash`方案,上面我们已经将日志从 Fluentd 采集输出到 Kafka 中去了,接下来我们使用 Logstash 来连接 Kafka 与 Elasticsearch 间的日志数据。 首先使用`helm pull`拉取 Chart 并解压: ```bash $ helm pull elastic/logstash --untar --version 7.17.3 $ cd logstash ``` 同样在 Chart 根目录下面创建用于安装的 Values 文件,如下所示: ```yaml # values.yaml fullnameOverride: logstash persistence: enabled: true logstashConfig: logstash.yml: | http.host: 0.0.0.0 # 要注意下格式 logstashPipeline: logstash.conf: | input { kafka { bootstrap_servers => "kafka-0.kafka-headless.logging.svc.cluster.local:9092" codec => json consumer_threads => 3 topics => ["messages"] } } filter {} # 过滤配置(比如可以删除key、添加geoip等等) output { elasticsearch { hosts => [ "elasticsearch-master:9200" ] index => "logstash-k8s-%{+YYYY.MM.dd}" } stdout { codec => rubydebug } } volumeClaimTemplate: accessModes: ["ReadWriteOnce"] storageClassName: nfs-storage resources: requests: storage: 30Gi ``` 其中最重要的就是通过`logstashPipeline`配置 logstash 数据流的处理配置,通过`input`指定日志源 kafka 的配置,通过`output`输出到 Elasticsearch,同样直接使用上面的 Values 文件安装 logstash 即可: ```bash $ helm upgrade --install logstash -f values.yaml --namespace logging . Release "logstash" does not exist. Installing it now. NAME: logstash LAST DEPLOYED: Fri Jun 30 19:48:51 2023 NAMESPACE: logging STATUS: deployed REVISION: 1 TEST SUITE: None NOTES: 1. Watch all cluster members come up. $ kubectl get pods --namespace=logging -l app=logstash -w ``` 安装启动完成后可以查看 logstash 的日志: ```bash $ kubectl get pods --namespace=logging -l app=logstash NAME READY STATUS RESTARTS AGE logstash-0 1/1 Running 0 2m8s $ kubectl logs -f logstash-0 -n logging ...... { "@version" => "1", "stream" => "stdout", "@timestamp" => 2023-06-30T11:09:16.889Z, "message" => "4672: Thu Jun 30 11:09:15 UTC 2023", "kubernetes" => { "container_image" => "docker.io/library/busybox:latest", "container_name" => "count", "labels" => { "logging" => "true" }, "pod_name" => "counter", "namespace_name" => "default", "pod_ip" => "10.244.2.118", "host" => "node2", "namespace_labels" => { "kubernetes_io/metadata_name" => "default" } }, "docker" => {} } ``` 由于我们启用了 debug 日志调试,所以我们可以在 logstash 的日志中看到我们采集的日志消息,到这里证明我们的日志数据就获取成功了。现在我们可以登录到 Kibana 可以看到有如下所示的索引数据了:  然后同样创建索引模式,匹配上面的索引即可:  创建完成后就可以前往发现页面过滤日志数据了:  到这里我们就实现了一个使用`Fluentd+Kafka+Logstash+Elasticsearch+Kibana`的Kubernetes 日志收集工具栈,这里我们完整的 Pod 信息如下所示: ```bash $ kubectl get pods -n logging NAME READY STATUS RESTARTS AGE elasticsearch-master-0 1/1 Running 0 45m elasticsearch-master-1 1/1 Running 0 45m elasticsearch-master-2 1/1 Running 0 45m fluentd-5mk96 1/1 Running 0 13m fluentd-gq79f 1/1 Running 0 13m fluentd-r4jkx 1/1 Running 0 13m fluentd-sxxp5 1/1 Running 0 13m fluentd-x5xr5 1/1 Running 0 13m fluentd-z2f2t 1/1 Running 0 13m kafka-0 1/1 Running 0 52m kafka-client 1/1 Running 0 10m kafka-zookeeper-0 1/1 Running 0 52m kibana-kibana-6755f6db68-rv76q 1/1 Running 0 44m logstash-0 1/1 Running 0 3m51s ``` 当然在实际的工作项目中还需要我们根据实际的业务场景来进行参数性能调优以及高可用等设置,以达到系统的最优性能。 上面我们在配置`logstash`的时候是将日志输出到 "logstash-k8s-%{+YYYY.MM.dd}"这个索引模式的,可能有的场景下只通过日期去区分索引不是很合理; 那么我们可以根据自己的需求去修改索引名称,比如可以根据我们的服务名称来进行区分,那么这个服务名称可以怎么来定义呢? 可以是 Pod 的名称或者通过 label 标签去指定,比如我们这里去做一个规范,要求需要收集日志的 Pod 除了需要添加`logging: true`这个标签之外,还需要添加一个`logIndex: <索引名>`的标签。 比如重新更新我们测试的 counter 应用: ```yaml apiVersion: v1 kind: Pod metadata: name: counter labels: logging: "true" # 一定要具有该标签才会被采集 logIndex: "zhdya" # 指定索引名称 spec: containers: - name: count image: busybox args: [ /bin/sh, -c, 'i=0; while true; do echo "$i: $(date)"; i=$((i+1)); sleep 1; done', ] ``` 然后重新更新 Logstash 的配置,修改 values 配置: ```yaml # ...... logstashPipeline: logstash.conf: | input { kafka { bootstrap_servers => "kafka-0.kafka-headless.logging.svc.cluster.local:9092" codec => json consumer_threads => 3 topics => ["messages"] } } filter {} # 过滤配置(比如可以删除key、添加geoip等等) output { elasticsearch { hosts => [ "elasticsearch-master:9200" ] index => "k8s-%{[kubernetes][labels][logIndex]}-%{+YYYY.MM.dd}" } stdout { codec => rubydebug } } # ...... ``` logstash更新: ```bash $ helm upgrade --install logstash -f values.yaml --namespace logging . ``` 使用上面的 values 值更新 logstash,正常更新后上面的 counter 这个 Pod 日志会输出 到一个名为`k8s-zhdya-2023.07.05`的索引去。  这样我们就实现了自定义索引名称,当然你也可以使用 Pod 名称、容器名称、命名空间 名称来作为索引的名称,这完全取决于你自己的需求。
阿星
2024年1月27日 16:34
转发文档
收藏文档
上一篇
下一篇
手机扫码
复制链接
手机扫一扫转发分享
复制链接
Markdown文件
PDF文档(打印)
分享
链接
类型
密码
更新密码