作者都是各自领域经过审查的专家,并撰写他们有经验的主题. 我们所有的内容都经过同行评审,并由同一领域的Toptal专家验证.
Fabrice是一名云架构师和软件开发人员,在思科工作了20多年, Samsung, Philips, Alcatel, and Sagem.
PREVIOUSLY AT
Elasticsearch是一个功能强大的软件解决方案,旨在快速搜索大量数据中的信息. 结合Logstash和Kibana,这形成了非正式的名称 “ELK stack”,通常用于收集、临时存储、分析和可视化日志数据. 通常还需要一些其他的软件,比如 Filebeat to send the logs from the server to Logstash, and Elastalert 根据对存储在Elasticsearch中的数据运行的一些分析结果生成警报.
我使用ELK管理日志的经验非常复杂. 一方面,它非常强大,其功能范围令人印象深刻. 另一方面,它的设置很棘手,维护起来也很麻烦.
The fact is that Elasticsearch is very good in general and can be used in a wide variety of scenarios; it can even be used as a search engine! Since it is not specialized for managing log data, 这需要更多的配置工作来定制其行为,以满足管理此类数据的特定需求.
设置ELK集群是相当棘手的,需要我玩弄一些参数,以便最终得到它的启动和运行. Then came the work of configuring it. In my case, 我有五个不同的软件需要配置(Filebeat), Logstash, Elasticsearch, Kibana, and Elastalert). This can be a quite tedious job, 因为我必须通读文档并调试链中不与下一个通信的一个元素. Even after you finally get your cluster up and running, 您仍然需要对其进行日常维护操作:打补丁, upgrading the OS packages, checking CPU, RAM, and disk usage, making minor adjustments as required, etc.
我的整个ELK堆栈在Logstash更新后停止工作. 仔细检查后发现,出于某种原因, ELK developers 决定更改配置文件中的关键字并将其复数化. 这是最后一根稻草,我决定寻找更好的解决方案(至少是针对我的特殊需求的更好的解决方案)。.
我想存储由Apache和各种PHP和节点应用程序生成的日志, 并对它们进行解析,以发现软件中存在缺陷的模式. The solution I found was the following:
And, at a high level, that’s it! 100%无服务器解决方案,无需任何维护即可正常工作,并且无需任何额外工作即可很好地扩展. 与服务器集群相比,这种无服务器解决方案的优势有很多:
So now let’s get into the details! 让我们来研究一下这样一个设置的CloudFormation模板是什么样子的, complete with Slack webhooks for alerting engineers. 我们需要首先配置所有的Slack设置,所以让我们深入了解它.
AWSTemplateFormatVersion: 2010-09-09
Description: Setup log processing
Parameters:
SlackWebhookHost:
Type: String
Description: Host name for Slack web hooks
Default: hooks.slack.com
SlackWebhookPath:
Type: String
Description: Path part of the Slack webhook URL
Default: /services/YOUR/SLACK/WEBHOOK
为此,你需要设置你的Slack工作区,请查收 this WebHooks for Slack guide for additional info.
一旦你创建了你的Slack应用程序并配置了一个传入钩子, 钩子URL将成为CloudFormation堆栈的一个参数.
Resources:
ApacheAccessLogGroup:
Type: AWS::Logs::LogGroup
Properties:
RetentionInDays: 100 # Or whatever is good for you
ApacheErrorLogGroup:
Type: AWS::Logs::LogGroup
Properties:
RetentionInDays: 100 # Or whatever is good for you
Here we created two log groups: one for the Apache access logs, the other for the Apache error logs.
我没有为日志数据配置任何生命周期机制,因为这超出了本文的讨论范围. In practice, 您可能希望缩短保留窗口,并设计S3生命周期策略,以便在一段时间后将它们移动到Glacier.
现在让我们实现Lambda函数,它将处理Apache访问日志.
BasicLambdaExecutionRole:
Type: AWS::IAM::Role
Properties:
AssumeRolePolicyDocument:
Version: 2012-10-17
Statement:
- Effect: Allow
Principal:
Service: lambda.amazonaws.com
Action: sts:AssumeRole
ManagedPolicyArns:
-在攻击:aws:我::/服务/ AWSLambdaBasicExecutionRole aws:政策
这里我们创建了一个IAM角色,它将附加到Lambda函数, to allow them to perform their duties. In effect, the AWSLambdaBasicExecutionRole
is (despite its name) an IAM policy provided by AWS. 它只允许Lambda函数创建它的日志组和该组中的日志流, and then to send its own logs to CloudWatch Logs.
ProcessApacheAccessLogFunction:
Type: AWS::Lambda::Function
Properties:
Handler: index.handler
Role: !GetAtt BasicLambdaExecutionRole.Arn
Runtime: python3.7
Timeout: 10
Environment:
Variables:
SLACK_WEBHOOK_HOST: !Ref SlackWebHookHost
SLACK_WEBHOOK_PATH: !Ref SlackWebHookPath
Code:
ZipFile: |
import base64
import gzip
import json
import os
from http.client import HTTPSConnection
def handler(event, context):
tmp = event['awslogs']['data']
# `awslogs.data` is base64-encoded gzip'ed JSON
tmp = base64.b64decode(tmp)
tmp = gzip.decompress(tmp)
tmp = json.loads(tmp)
events = tmp['logEvents']
for event in events:
raw_log = event['message']
log = json.loads(raw_log)
if log['status'][0] == "5":
# This is a 5XX status code
print(f"收到一个带有5XX状态码的Apache访问日志:{raw_log}")
slack_host = os.getenv('SLACK_WEBHOOK_HOST')
slack_path = os.getenv('SLACK_WEBHOOK_PATH')
print(f"发送Slack帖子到:host={slack_host}, path={slack_path}, url={url}, content={raw_log}")
cnx = HTTPSConnection(slack_host, timeout=5)
cnx.request("POST", slack_path, json.dumps({'text': raw_log}))
# It's important to read the response; if the cnx is closed too quickly, Slack might not post the msg
resp = cnx.getresponse()
resp_content = resp.read()
resp_code = resp.status
assert resp_code == 200
因此,这里我们定义了一个Lambda函数来处理Apache访问日志. 请注意,我没有使用Apache默认的通用日志格式. 我像这样配置访问日志格式(您将注意到它实际上生成的日志格式为JSON), 这使得进一步的处理更容易):
LogFormat "{\"vhost\": \"%v:%p\", \"client\": \"%a\", \"user\": \"%u\", \"timestamp\": \"%{%Y-%m-%dT%H:%M:%S}t\", \"request\": \"%r\", \"status\": \"%>s\", \"size\": \"%O\", \"referer\": \"%{Referer}i\", \"useragent\": \"%{User-Agent}i\"}" json
This Lambda function is written in Python 3. 它接收从CloudWatch发送的日志行,并可以搜索模式. In the example above, 它只是检测导致5XX状态码的HTTP请求,并向Slack频道发布消息.
在模式检测方面,您可以做任何您喜欢的事情, 而且它是一门真正的编程语言(Python), 而不是仅仅在Logstash或Elastalert配置文件中的正则表达式模式, 给了你很多实现复杂模式识别的机会.
关于修订控制的简短介绍:我发现,将代码内联到CloudFormation模板中,用于小型实用程序Lambda函数(如此)是非常可接受和方便的. Of course, 用于涉及许多Lambda函数和层的大型项目, 这很可能不方便,您需要使用SAM.
ApacheAccessLogFunctionPermission:
Type: AWS::Lambda::Permission
Properties:
FunctionName: !Ref ProcessApacheAccessLogFunction
Action: lambda:InvokeFunction
Principal: logs.amazonaws.com
SourceArn: !子在攻击:aws:日志:$ {}aws:地区:$ {aws:: AccountId}:日志组:*
上面的代码允许CloudWatch日志调用Lambda函数. One word of caution: I found that using the SourceAccount
property can lead to conflicts with the SourceArn
.
Generally speaking, 当调用Lambda函数的服务在同一个AWS帐户中时,我建议不要包含它. The SourceArn
会禁止其他帐户调用Lambda函数吗.
ApacheAccessLogSubscriptionFilter:
Type: AWS::Logs::SubscriptionFilter
DependsOn: ApacheAccessLogFunctionPermission
Properties:
LogGroupName: !Ref ApacheAccessLogGroup
DestinationArn: !GetAtt ProcessApacheAccessLogFunction.Arn
FilterPattern: "{$.status = 5*}"
订阅过滤器资源是CloudWatch日志和Lambda之间的链接. Here, logs sent to the ApacheAccessLogGroup
will be forwarded to the Lambda function we defined above, but only those logs that pass the filter pattern. Here, 过滤器模式需要一些JSON作为输入(过滤器模式以'{'开始,以'}'结束), and will match the log entry only if it has a field status
which starts with “5”.
这意味着只有当Apache返回的HTTP状态码是500码时,我们才调用Lambda函数, which usually means something quite bad is going on. 这确保我们不会过多地调用Lambda函数,从而避免不必要的开销.
More information on filter patterns can be found in Amazon CloudWatch documentation. CloudWatch的过滤模式非常好,尽管显然没有Grok那么强大.
Note the DependsOn
field, 这确保CloudWatch日志可以在创建订阅之前调用Lambda函数. This is just a cherry on the cake, it’s most probably unnecessary as in a real-case scenario, Apache可能在几秒钟之前不会收到请求(例如:将EC2实例与负载平衡器链接起来), 并让负载均衡器识别EC2实例的状态为健康状态).
现在让我们看一下处理Apache错误日志的Lambda函数.
ProcessApacheErrorLogFunction:
Type: AWS::Lambda::Function
Properties:
Handler: index.handler
Role: !GetAtt BasicLambdaExecutionRole.Arn
Runtime: python3.7
Timeout: 10
Environment:
Variables:
SLACK_WEBHOOK_HOST: !Ref SlackWebHookHost
SLACK_WEBHOOK_PATH: !Ref SlackWebHookPath
Code:
ZipFile: |
import base64
import gzip
import json
import os
from http.client import HTTPSConnection
def handler(event, context):
tmp = event['awslogs']['data']
# `awslogs.data` is base64-encoded gzip'ed JSON
tmp = base64.b64decode(tmp)
tmp = gzip.decompress(tmp)
tmp = json.loads(tmp)
events = tmp['logEvents']
for event in events:
raw_log = event['message']
log = json.loads(raw_log)
if log['level'] in ["error", "crit", "alert", "emerg"]:
# This is a serious error message
msg = log['msg']
if msg.startswith("PHP Notice") or msg.startswith("PHP Warning"):
print(f"Ignoring PHP notices and warnings: {raw_log}")
else:
print(f"Received a serious Apache error log: {raw_log}")
slack_host = os.getenv('SLACK_WEBHOOK_HOST')
slack_path = os.getenv('SLACK_WEBHOOK_PATH')
print(f"发送Slack帖子到:host={slack_host}, path={slack_path}, url={url}, content={raw_log}")
cnx = HTTPSConnection(slack_host, timeout=5)
cnx.request("POST", slack_path, json.dumps({'text': raw_log}))
# It's important to read the response; if the cnx is closed too quickly, Slack might not post the msg
resp = cnx.getresponse()
resp_content = resp.read()
resp_code = resp.status
assert resp_code == 200
第二个Lambda函数处理Apache错误日志,只有在遇到严重错误时才会向Slack发布消息. 在这种情况下,PHP通知和警告消息不会被认为严重到足以触发警报.
同样,这个函数期望Apache错误日志是json格式的. So here is the error log format string I have been using:
ErrorLogFormat "{\"vhost\": \"%v\", \"timestamp\": \"%{cu}t\", \"module\": \"%-m\", \"level\": \"%l\", \"pid\": \"%-P\", \"tid\": \"%-T\", \"oserror\": \"%-E\", \"client\": \"%-a\", \"msg\": \"%M\"}"
ApacheErrorLogFunctionPermission:
Type: AWS::Lambda::Permission
Properties:
FunctionName: !Ref ProcessApacheErrorLogFunction
Action: lambda:InvokeFunction
Principal: logs.amazonaws.com
SourceArn: !子在攻击:aws:日志:$ {}aws:地区:$ {aws:: AccountId}:日志组:*
SourceAccount: !Ref AWS::AccountId
该资源授予CloudWatch Logs调用Lambda函数的权限.
ApacheErrorLogSubscriptionFilter:
Type: AWS::Logs::SubscriptionFilter
DependsOn: ApacheErrorLogFunctionPermission
Properties:
LogGroupName: !Ref ApacheErrorLogGroup
DestinationArn: !GetAtt ProcessApacheErrorLogFunction.Arn
FilterPattern: '{$.msg != "PHP Warning*" && $.msg != "PHP Notice*"}'
Finally, 我们使用Apache错误日志组的订阅过滤器将CloudWatch日志与Lambda函数链接起来. Note the filter pattern, 它确保以“PHP警告”或“PHP通知”开头的消息的日志不会触发对Lambda函数的调用.
关于成本的最后一句话:此解决方案比操作ELK集群便宜得多. 存储在CloudWatch中的日志的定价与S3相同, 而Lambda每月允许100万次通话,这是其免费套餐的一部分. 这对于流量适中的网站来说可能足够了(前提是你使用了CloudWatch日志过滤器)。, 特别是如果你编写得很好并且没有太多错误的话!
另外,请注意Lambda函数最多支持1,000个并发调用. 在撰写本文时,这是AWS中的一个无法更改的硬限制. 但是,您可以期望对上述函数的调用持续大约30-40ms. This should be fast enough to handle rather heavy traffic. If your workload is so intense that you hit this limit, you probably need a more complex solution based on Kinesis, which I might cover in a future article.
ELK is an acronym for Elasticsearch-Logstash-Kibana. Additional software items are often needed, 比如Beats(一个向Logstash发送日志和指标的工具集合)和Elastalert(基于Elasticsearch时间序列数据生成警报).
The short answer is: yes. 组成ELK堆栈的各种软件项目有各种软件许可证,但通常都有提供免费使用而不提供任何支持的许可证. 但是,由您来设置和维护ELK集群.
ELK堆栈是高度可配置的,因此没有一种方法可以使其工作. For example, 下面是Apache日志条目的路径:Filebeat读取该条目并将其发送到Logstash, which parses it, and sends it to Elasticsearch, which saves and indexes it. Kibana can then retrieve the data and display it.
London, United Kingdom
Member since September 6, 2017
Fabrice是一名云架构师和软件开发人员,在思科工作了20多年, Samsung, Philips, Alcatel, and Sagem.
PREVIOUSLY AT
World-class articles, delivered weekly.
World-class articles, delivered weekly.
Join the Toptal® community.