作者都是各自领域经过审查的专家,并撰写他们有经验的主题. 我们所有的内容都经过同行评审,并由同一领域的Toptal专家验证.
Fabrice Triboix
Verified Expert in Engineering

Fabrice是一名云架构师和软件开发人员,在思科工作了20多年, Samsung, Philips, Alcatel, and Sagem.

Expertise

PREVIOUSLY AT

Cisco
Share

Elasticsearch是一个功能强大的软件解决方案,旨在快速搜索大量数据中的信息. 结合Logstash和Kibana,这形成了非正式的名称 “ELK stack”,通常用于收集、临时存储、分析和可视化日志数据. 通常还需要一些其他的软件,比如 Filebeat to send the logs from the server to Logstash, and Elastalert 根据对存储在Elasticsearch中的数据运行的一些分析结果生成警报.

The ELK Stack is Powerful, But…

我使用ELK管理日志的经验非常复杂. 一方面,它非常强大,其功能范围令人印象深刻. 另一方面,它的设置很棘手,维护起来也很麻烦.

The fact is that Elasticsearch is very good in general and can be used in a wide variety of scenarios; it can even be used as a search engine! Since it is not specialized for managing log data, 这需要更多的配置工作来定制其行为,以满足管理此类数据的特定需求.

设置ELK集群是相当棘手的,需要我玩弄一些参数,以便最终得到它的启动和运行. Then came the work of configuring it. In my case, 我有五个不同的软件需要配置(Filebeat), Logstash, Elasticsearch, Kibana, and Elastalert). This can be a quite tedious job, 因为我必须通读文档并调试链中不与下一个通信的一个元素. Even after you finally get your cluster up and running, 您仍然需要对其进行日常维护操作:打补丁, upgrading the OS packages, checking CPU, RAM, and disk usage, making minor adjustments as required, etc.

我的整个ELK堆栈在Logstash更新后停止工作. 仔细检查后发现,出于某种原因, ELK developers 决定更改配置文件中的关键字并将其复数化. 这是最后一根稻草,我决定寻找更好的解决方案(至少是针对我的特殊需求的更好的解决方案)。.

我想存储由Apache和各种PHP和节点应用程序生成的日志, 并对它们进行解析,以发现软件中存在缺陷的模式. The solution I found was the following:

  • Install CloudWatch Agent on the target.
  • 配置CloudWatch Agent以将日志发送到CloudWatch日志.
  • 触发Lambda函数的调用以处理日志.
  • 如果找到模式,Lambda函数将向Slack通道发布消息.
  • Where possible, 对CloudWatch日志组应用过滤器,以避免对每个日志调用Lambda函数(这会很快增加成本)。.

And, at a high level, that’s it! 100%无服务器解决方案,无需任何维护即可正常工作,并且无需任何额外工作即可很好地扩展. 与服务器集群相比,这种无服务器解决方案的优势有很多:

  • In essence, 在集群服务器上定期执行的所有日常维护操作现在都由云提供商负责. 任何底层服务器都会在你不知情的情况下为你打补丁、升级和维护.
  • 您不再需要监控您的集群,您可以将所有扩展问题委托给云提供商. Indeed, 如上所述的无服务器设置将自动扩展,而无需执行任何操作!
  • The solution described above requires less configuration, 而且云提供商不太可能对配置格式进行重大更改.
  • Finally, 编写一些CloudFormation模板将所有这些作为基础设施即代码是非常容易的. 如果要设置一个完整的ELK集群,则需要大量的工作.

Configuring Slack Alerts

So now let’s get into the details! 让我们来研究一下这样一个设置的CloudFormation模板是什么样子的, complete with Slack webhooks for alerting engineers. 我们需要首先配置所有的Slack设置,所以让我们深入了解它.

AWSTemplateFormatVersion: 2010-09-09

Description: Setup log processing

Parameters:
  SlackWebhookHost:
  	Type: String
  	Description: Host name for Slack web hooks
  	Default: hooks.slack.com

  SlackWebhookPath:
  	Type: String
  	Description: Path part of the Slack webhook URL
  	Default: /services/YOUR/SLACK/WEBHOOK

为此,你需要设置你的Slack工作区,请查收 this WebHooks for Slack guide for additional info.

一旦你创建了你的Slack应用程序并配置了一个传入钩子, 钩子URL将成为CloudFormation堆栈的一个参数.

Resources:
  ApacheAccessLogGroup:
  	Type: AWS::Logs::LogGroup
  	Properties:
  	RetentionInDays: 100  # Or whatever is good for you

  ApacheErrorLogGroup:
  	Type: AWS::Logs::LogGroup
  	Properties:
  	RetentionInDays: 100  # Or whatever is good for you

Here we created two log groups: one for the Apache access logs, the other for the Apache error logs.

我没有为日志数据配置任何生命周期机制,因为这超出了本文的讨论范围. In practice, 您可能希望缩短保留窗口,并设计S3生命周期策略,以便在一段时间后将它们移动到Glacier.

Lambda Function to Process Access Logs

现在让我们实现Lambda函数,它将处理Apache访问日志.

BasicLambdaExecutionRole:
	Type: AWS::IAM::Role
	Properties:
  AssumeRolePolicyDocument:
  Version: 2012-10-17
  Statement:
  - Effect: Allow
  Principal:
  Service: lambda.amazonaws.com
  Action: sts:AssumeRole
  ManagedPolicyArns:
  -在攻击:aws:我::/服务/ AWSLambdaBasicExecutionRole aws:政策

这里我们创建了一个IAM角色,它将附加到Lambda函数, to allow them to perform their duties. In effect, the AWSLambdaBasicExecutionRole is (despite its name) an IAM policy provided by AWS. 它只允许Lambda函数创建它的日志组和该组中的日志流, and then to send its own logs to CloudWatch Logs.

ProcessApacheAccessLogFunction:
	Type: AWS::Lambda::Function
	Properties:
  Handler: index.handler
  Role: !GetAtt BasicLambdaExecutionRole.Arn
  Runtime: python3.7
  Timeout: 10
  Environment:
  Variables:
  SLACK_WEBHOOK_HOST: !Ref SlackWebHookHost
  SLACK_WEBHOOK_PATH: !Ref SlackWebHookPath
  Code:
  ZipFile: |
  import base64
  import gzip
  import json
  import os
  from http.client import HTTPSConnection

  def handler(event, context):
  tmp = event['awslogs']['data']
  # `awslogs.data` is base64-encoded gzip'ed JSON
  tmp = base64.b64decode(tmp)
  tmp = gzip.decompress(tmp)
  tmp = json.loads(tmp)
  events = tmp['logEvents']
  for event in events:
  raw_log = event['message']
  log = json.loads(raw_log)
  if log['status'][0] == "5":
    # This is a 5XX status code
    print(f"收到一个带有5XX状态码的Apache访问日志:{raw_log}")
    slack_host = os.getenv('SLACK_WEBHOOK_HOST')
    slack_path = os.getenv('SLACK_WEBHOOK_PATH')
    print(f"发送Slack帖子到:host={slack_host}, path={slack_path}, url={url}, content={raw_log}")
    cnx = HTTPSConnection(slack_host, timeout=5)
    cnx.request("POST", slack_path, json.dumps({'text': raw_log}))
    # It's important to read the response; if the cnx is closed too quickly, Slack might not post the msg
    resp = cnx.getresponse()
    resp_content = resp.read()
    resp_code = resp.status
    assert resp_code == 200

因此,这里我们定义了一个Lambda函数来处理Apache访问日志. 请注意,我没有使用Apache默认的通用日志格式. 我像这样配置访问日志格式(您将注意到它实际上生成的日志格式为JSON), 这使得进一步的处理更容易):

LogFormat "{\"vhost\": \"%v:%p\", \"client\": \"%a\", \"user\": \"%u\", \"timestamp\": \"%{%Y-%m-%dT%H:%M:%S}t\", \"request\": \"%r\", \"status\": \"%>s\", \"size\": \"%O\", \"referer\": \"%{Referer}i\", \"useragent\": \"%{User-Agent}i\"}" json

This Lambda function is written in Python 3. 它接收从CloudWatch发送的日志行,并可以搜索模式. In the example above, 它只是检测导致5XX状态码的HTTP请求,并向Slack频道发布消息.

在模式检测方面,您可以做任何您喜欢的事情, 而且它是一门真正的编程语言(Python), 而不是仅仅在Logstash或Elastalert配置文件中的正则表达式模式, 给了你很多实现复杂模式识别的机会.

Revision Control

关于修订控制的简短介绍:我发现,将代码内联到CloudFormation模板中,用于小型实用程序Lambda函数(如此)是非常可接受和方便的. Of course, 用于涉及许多Lambda函数和层的大型项目, 这很可能不方便,您需要使用SAM.

ApacheAccessLogFunctionPermission:
	Type: AWS::Lambda::Permission
	Properties:
  FunctionName: !Ref ProcessApacheAccessLogFunction
  Action: lambda:InvokeFunction
  Principal: logs.amazonaws.com
  SourceArn: !子在攻击:aws:日志:$ {}aws:地区:$ {aws:: AccountId}:日志组:*

上面的代码允许CloudWatch日志调用Lambda函数. One word of caution: I found that using the SourceAccount property can lead to conflicts with the SourceArn.

Generally speaking, 当调用Lambda函数的服务在同一个AWS帐户中时,我建议不要包含它. The SourceArn 会禁止其他帐户调用Lambda函数吗.

ApacheAccessLogSubscriptionFilter:
	Type: AWS::Logs::SubscriptionFilter
	DependsOn: ApacheAccessLogFunctionPermission
	Properties:
  LogGroupName: !Ref ApacheAccessLogGroup
  DestinationArn: !GetAtt ProcessApacheAccessLogFunction.Arn
  FilterPattern: "{$.status = 5*}"

订阅过滤器资源是CloudWatch日志和Lambda之间的链接. Here, logs sent to the ApacheAccessLogGroup will be forwarded to the Lambda function we defined above, but only those logs that pass the filter pattern. Here, 过滤器模式需要一些JSON作为输入(过滤器模式以'{'开始,以'}'结束), and will match the log entry only if it has a field status which starts with “5”.

这意味着只有当Apache返回的HTTP状态码是500码时,我们才调用Lambda函数, which usually means something quite bad is going on. 这确保我们不会过多地调用Lambda函数,从而避免不必要的开销.

More information on filter patterns can be found in Amazon CloudWatch documentation. CloudWatch的过滤模式非常好,尽管显然没有Grok那么强大.

Note the DependsOn field, 这确保CloudWatch日志可以在创建订阅之前调用Lambda函数. This is just a cherry on the cake, it’s most probably unnecessary as in a real-case scenario, Apache可能在几秒钟之前不会收到请求(例如:将EC2实例与负载平衡器链接起来), 并让负载均衡器识别EC2实例的状态为健康状态).

Lambda Function to Process Error Logs

现在让我们看一下处理Apache错误日志的Lambda函数.

ProcessApacheErrorLogFunction:
	Type: AWS::Lambda::Function
	Properties:
  Handler: index.handler
  Role: !GetAtt BasicLambdaExecutionRole.Arn
  Runtime: python3.7
  Timeout: 10
  Environment:
  Variables:
  SLACK_WEBHOOK_HOST: !Ref SlackWebHookHost
  SLACK_WEBHOOK_PATH: !Ref SlackWebHookPath
  Code:
  ZipFile: |
  import base64
  import gzip
  import json
  import os
  from http.client import HTTPSConnection

  def handler(event, context):
  tmp = event['awslogs']['data']
  # `awslogs.data` is base64-encoded gzip'ed JSON
  tmp = base64.b64decode(tmp)
  tmp = gzip.decompress(tmp)
  tmp = json.loads(tmp)
  events = tmp['logEvents']
  for event in events:
  raw_log = event['message']
  log = json.loads(raw_log)
  if log['level'] in ["error", "crit", "alert", "emerg"]:
    # This is a serious error message
    msg = log['msg']
    if msg.startswith("PHP Notice") or msg.startswith("PHP Warning"):
    print(f"Ignoring PHP notices and warnings: {raw_log}")
    else:
    print(f"Received a serious Apache error log: {raw_log}")
    slack_host = os.getenv('SLACK_WEBHOOK_HOST')
    slack_path = os.getenv('SLACK_WEBHOOK_PATH')
    print(f"发送Slack帖子到:host={slack_host}, path={slack_path}, url={url}, content={raw_log}")
    cnx = HTTPSConnection(slack_host, timeout=5)
    cnx.request("POST", slack_path, json.dumps({'text': raw_log}))
    # It's important to read the response; if the cnx is closed too quickly, Slack might not post the msg
    resp = cnx.getresponse()
    resp_content = resp.read()
    resp_code = resp.status
    assert resp_code == 200

第二个Lambda函数处理Apache错误日志,只有在遇到严重错误时才会向Slack发布消息. 在这种情况下,PHP通知和警告消息不会被认为严重到足以触发警报.

同样,这个函数期望Apache错误日志是json格式的. So here is the error log format string I have been using:

ErrorLogFormat "{\"vhost\": \"%v\", \"timestamp\": \"%{cu}t\", \"module\": \"%-m\", \"level\": \"%l\", \"pid\": \"%-P\", \"tid\": \"%-T\", \"oserror\": \"%-E\", \"client\": \"%-a\", \"msg\": \"%M\"}"
ApacheErrorLogFunctionPermission:
	Type: AWS::Lambda::Permission
	Properties:
  FunctionName: !Ref ProcessApacheErrorLogFunction
  Action: lambda:InvokeFunction
  Principal: logs.amazonaws.com
  SourceArn: !子在攻击:aws:日志:$ {}aws:地区:$ {aws:: AccountId}:日志组:*
  SourceAccount: !Ref AWS::AccountId

该资源授予CloudWatch Logs调用Lambda函数的权限.

ApacheErrorLogSubscriptionFilter:
	Type: AWS::Logs::SubscriptionFilter
	DependsOn: ApacheErrorLogFunctionPermission
	Properties:
  LogGroupName: !Ref ApacheErrorLogGroup
  DestinationArn: !GetAtt ProcessApacheErrorLogFunction.Arn
  FilterPattern: '{$.msg != "PHP Warning*" && $.msg != "PHP Notice*"}'

Finally, 我们使用Apache错误日志组的订阅过滤器将CloudWatch日志与Lambda函数链接起来. Note the filter pattern, 它确保以“PHP警告”或“PHP通知”开头的消息的日志不会触发对Lambda函数的调用.

Final Thoughts, Pricing, and Availability

关于成本的最后一句话:此解决方案比操作ELK集群便宜得多. 存储在CloudWatch中的日志的定价与S3相同, 而Lambda每月允许100万次通话,这是其免费套餐的一部分. 这对于流量适中的网站来说可能足够了(前提是你使用了CloudWatch日志过滤器)。, 特别是如果你编写得很好并且没有太多错误的话!

另外,请注意Lambda函数最多支持1,000个并发调用. 在撰写本文时,这是AWS中的一个无法更改的硬限制. 但是,您可以期望对上述函数的调用持续大约30-40ms. This should be fast enough to handle rather heavy traffic. If your workload is so intense that you hit this limit, you probably need a more complex solution based on Kinesis, which I might cover in a future article.

Further Reading on the Toptal Blog:

Understanding the basics

  • What is the ELK stack?

    ELK is an acronym for Elasticsearch-Logstash-Kibana. Additional software items are often needed, 比如Beats(一个向Logstash发送日志和指标的工具集合)和Elastalert(基于Elasticsearch时间序列数据生成警报).

  • Is ELK stack free?

    The short answer is: yes. 组成ELK堆栈的各种软件项目有各种软件许可证,但通常都有提供免费使用而不提供任何支持的许可证. 但是,由您来设置和维护ELK集群.

  • How does the ELK stack work?

    ELK堆栈是高度可配置的,因此没有一种方法可以使其工作. For example, 下面是Apache日志条目的路径:Filebeat读取该条目并将其发送到Logstash, which parses it, and sends it to Elasticsearch, which saves and indexes it. Kibana can then retrieve the data and display it.

Hire a Toptal expert on this topic.
Hire Now
Fabrice Triboix

Fabrice Triboix

Verified Expert in Engineering

London, United Kingdom

Member since September 6, 2017

About the author

Fabrice是一名云架构师和软件开发人员,在思科工作了20多年, Samsung, Philips, Alcatel, and Sagem.

作者都是各自领域经过审查的专家,并撰写他们有经验的主题. 我们所有的内容都经过同行评审,并由同一领域的Toptal专家验证.

Expertise

PREVIOUSLY AT

Cisco

World-class articles, delivered weekly.

By entering your email, you are agreeing to our privacy policy.

World-class articles, delivered weekly.

By entering your email, you are agreeing to our privacy policy.

Toptal Developers

Join the Toptal® community.