使用 CloudTrail 识别 Amazon S3 请求
在 Amazon S3 中,可以使用 Amazon CloudTrail 事件日志识别请求。Amazon CloudTrail 是识别 Amazon S3 请求的首选方法,但是如果您使用的是 Amazon S3 服务器访问日志,请参阅使用 Amazon S3 访问日志确定请求。
识别 CloudTrail 日志中向 Amazon S3 发出的请求
由 CloudTrail 记录的事件作为使用 GZipp 进行压缩的 JSON 对象存储在您的 S3 存储桶中。要高效地查找请求,您应使用 Amazon Athena 等服务来为 CloudTrail 日志建立索引并查询。有关 CloudTrail 和 Athena 的更多信息,请参阅 Amazon Athena 用户指南中的查询 Amazon CloudTrail 日志。
将 Athena 与 CloudTrail 日志结合使用
在设置 CloudTrail 以将事件传输到存储桶后,您应开始看到对象进入您在 Amazon S3 控制台上的目标存储桶。其格式如下所示:
s3://
myawsexamplebucket1/AWSLogs/111122223333/CloudTrail/Region/yyyy/mm/dd
例 — 使用 Athena 查询 CloudTrail 事件日志以查找特定请求
找到您的 CloudTrail 事件日志:
s3://myawsexamplebucket1/AWSLogs/111122223333/CloudTrail/us-east-2/2019/04/14
利用 CloudTrail 事件日志,您现在可以创建 Athena 数据库和表来查询它们,如下所示:
从 https://console.aws.amazon.com/athena/
打开 Athena 控制台。 -
将 Amazon Web Services 区域 更改为与您的 CloudTrail 目标 S3 存储桶所在的相同区域。
-
在查询窗口中,为 CloudTrail 事件创建 Athena 数据库。
CREATE DATABASE s3_cloudtrail_events_db -
使用以下查询为您在该存储桶中的所有 CloudTrail 事件创建一个表。请务必将存储桶名称从
更改为您的存储桶的名称。另请提供您的存储桶中使用的CloudTrail_myawsexamplebucket1CloudTrail。AWS_account_IDCREATE EXTERNAL TABLE s3_cloudtrail_events_db.cloudtrail_myawsexamplebucket1_table( eventversion STRING, useridentity STRUCT< type:STRING, principalid:STRING, arn:STRING, accountid:STRING, invokedby:STRING, accesskeyid:STRING, userName:STRING, sessioncontext:STRUCT< attributes:STRUCT< mfaauthenticated:STRING, creationdate:STRING>, sessionissuer:STRUCT< type:STRING, principalId:STRING, arn:STRING, accountId:STRING, userName:STRING> > >, eventtime STRING, eventsource STRING, eventname STRING, awsregion STRING, sourceipaddress STRING, useragent STRING, errorcode STRING, errormessage STRING, requestparameters STRING, responseelements STRING, additionaleventdata STRING, requestid STRING, eventid STRING, resources ARRAY<STRUCT< ARN:STRING, accountId:STRING, type:STRING>>, eventtype STRING, apiversion STRING, readonly STRING, recipientaccountid STRING, serviceeventdetails STRING, sharedeventid STRING, vpcendpointid STRING ) ROW FORMAT SERDE 'com.amazon.emr.hive.serde.CloudTrailSerde' STORED AS INPUTFORMAT 'com.amazon.emr.cloudtrail.CloudTrailInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat' LOCATION 's3://myawsexamplebucket1/AWSLogs/111122223333/'; -
测试 Athena 以确保该查询有用。
SELECT * FROM s3_cloudtrail_events_db.cloudtrail_myawsexamplebucket1_table WHERE eventsource='s3.amazonaws.com' LIMIT 2;
使用 CloudTrail 识别 Amazon S3 签名版本 2 请求
您可以使用 CloudTrail 事件日志来确定已用于在 Amazon S3 中签署请求的 API 签名版本。此功能非常重要,因为对 Signature Version 2 的支持将会关闭(弃用)。之后,Amazon S3 将不再接受使用 Signature Version 2 的请求,并且所有请求必须使用 Signature Version 4 进行签署。
我们强烈建议您使用 CloudTrail 来帮助确定您的任何工作流是否正在使用 Signature Version 2 进行签署。请通过将您的库和代码升级为使用 Signature Version 4 进行纠正,以防对您的业务产生任何影响。
有关更多信息,请参阅 Amazon 讨论论坛中的公告:适用于 Amazon S3 的 Amazon CloudTrail 为增强安全性审计添加了新字段
注意
Amazon S3 的 CloudTrail 事件在请求详细信息中的键名称“additionalEventData”之下包含签名版本。要在为 Amazon S3 中的对象发出的请求(如 GET、PUT 和 DELETE)上查找签名版本,您必须启用 CloudTrail 数据事件(默认情况下已将其关闭)。
Amazon CloudTrail 是确定签名版本 2 请求的首选方法。如果您使用的是 Amazon S3 服务器访问日志,请参阅 使用 Amazon S3 访问日志确定签名版本 2 请求。
用于识别 Amazon S3 签名版本 2 请求的 Athena 查询示例
例 — 选择所有签名版本 2 事件,并仅输出 EventTime、S3 Action、Request_Parameters、Region、SourceIP 和 UserAgent
在以下 Athena 查询中,将 <s3_cloudtrail_events_db.cloudtrail_myawsexamplebucket1_table> 替换为您的 Athena 详细信息并根据需要增加或去除限制。
SELECT EventTime, EventName as S3_Action, requestParameters as Request_Parameters, awsregion as AWS_Region, sourceipaddress as Source_IP, useragent as User_Agent FROM s3_cloudtrail_events_db.cloudtrail_myawsexamplebucket1_table WHERE eventsource='s3.amazonaws.com' AND json_extract_scalar(additionalEventData, '$.SignatureVersion')='SigV2' LIMIT 10;
例 — 选择发送签名版本 2 流量的所有请求者
SELECT useridentity.arn, Count(requestid) as RequestCount FROM s3_cloudtrail_events_db.cloudtrail_myawsexamplebucket1_table WHERE eventsource='s3.amazonaws.com' and json_extract_scalar(additionalEventData, '$.SignatureVersion')='SigV2' Group by useridentity.arn
对签名版本 2 数据进行分区
如果您有大量要查询的数据,则可以通过创建分区表来减少 Athena 的成本和运行时。
为此,创建一个包含分区的新表,如下所示。
CREATE EXTERNAL TABLE s3_cloudtrail_events_db.cloudtrail_myawsexamplebucket1_table_partitioned( eventversion STRING, userIdentity STRUCT< type:STRING, principalid:STRING, arn:STRING, accountid:STRING, invokedby:STRING, accesskeyid:STRING, userName:STRING, sessioncontext:STRUCT< attributes:STRUCT< mfaauthenticated:STRING, creationdate:STRING>, sessionIssuer:STRUCT< type:STRING, principalId:STRING, arn:STRING, accountId:STRING, userName:STRING> > >, eventTime STRING, eventSource STRING, eventName STRING, awsRegion STRING, sourceIpAddress STRING, userAgent STRING, errorCode STRING, errorMessage STRING, requestParameters STRING, responseElements STRING, additionalEventData STRING, requestId STRING, eventId STRING, resources ARRAY<STRUCT<ARN:STRING,accountId: STRING,type:STRING>>, eventType STRING, apiVersion STRING, readOnly STRING, recipientAccountId STRING, serviceEventDetails STRING, sharedEventID STRING, vpcEndpointId STRING ) PARTITIONED BY (region string, year string, month string, day string) ROW FORMAT SERDE 'com.amazon.emr.hive.serde.CloudTrailSerde' STORED AS INPUTFORMAT 'com.amazon.emr.cloudtrail.CloudTrailInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat' LOCATION 's3://myawsexamplebucket1/AWSLogs/111122223333/';
然后,单独创建分区。您无法从尚未创建的日期获取结果。
ALTER TABLE s3_cloudtrail_events_db.cloudtrail_myawsexamplebucket1_table_partitioned ADD PARTITION (region= 'us-east-1', year= '2019', month= '02', day= '19') LOCATION 's3://myawsexamplebucket1/AWSLogs/111122223333/CloudTrail/us-east-1/2019/02/19/' PARTITION (region= 'us-west-1', year= '2019', month= '02', day= '19') LOCATION 's3://myawsexamplebucket1/AWSLogs/111122223333/CloudTrail/us-west-1/2019/02/19/' PARTITION (region= 'us-west-2', year= '2019', month= '02', day= '19') LOCATION 's3://myawsexamplebucket1/AWSLogs/111122223333/CloudTrail/us-west-2/2019/02/19/' PARTITION (region= 'ap-southeast-1', year= '2019', month= '02', day= '19') LOCATION 's3://myawsexamplebucket1/AWSLogs/111122223333/CloudTrail/ap-southeast-1/2019/02/19/' PARTITION (region= 'ap-southeast-2', year= '2019', month= '02', day= '19') LOCATION 's3://myawsexamplebucket1/AWSLogs/111122223333/CloudTrail/ap-southeast-2/2019/02/19/' PARTITION (region= 'ap-northeast-1', year= '2019', month= '02', day= '19') LOCATION 's3://myawsexamplebucket1/AWSLogs/111122223333/CloudTrail/ap-northeast-1/2019/02/19/' PARTITION (region= 'eu-west-1', year= '2019', month= '02', day= '19') LOCATION 's3://myawsexamplebucket1/AWSLogs/111122223333/CloudTrail/eu-west-1/2019/02/19/' PARTITION (region= 'sa-east-1', year= '2019', month= '02', day= '19') LOCATION 's3://myawsexamplebucket1/AWSLogs/111122223333/CloudTrail/sa-east-1/2019/02/19/';
然后,您可以根据这些分区发出请求,并且无需加载完整的存储桶。
SELECT useridentity.arn, Count(requestid) AS RequestCount FROM s3_cloudtrail_events_db.cloudtrail_myawsexamplebucket1_table_partitioned WHERE eventsource='s3.amazonaws.com' AND json_extract_scalar(additionalEventData, '$.SignatureVersion')='SigV2' AND region='us-east-1' AND year='2019' AND month='02' AND day='19' Group by useridentity.arn
使用 CloudTrail 识别对 S3 对象的访问
您可以使用 Amazon CloudTrail 事件日志确定对于诸如 GetObject、DeleteObject 和 PutObject 等数据事件的 Amazon S3 对象访问请求,并发现有关这些请求的进一步信息。
以下示例说明了如何从 Amazon CloudTrail 事件日志获取对于 Amazon S3 的所有 PUT 对象请求。
用于识别 Amazon S3 对象访问请求的 Athena 查询示例
在以下 Athena 查询示例中,将 <s3_cloudtrail_events_db.cloudtrail_myawsexamplebucket1_table> 替换为您的 Athena 详细信息,并根据需要修改日期范围。
例 — 选择具有 PUT 对象访问请求的所有事件,并仅输出 EventTime、EventSource、SourceIP、UserAgent、BucketName、Object 和 UserARN
SELECT eventTime, eventName, eventSource, sourceIpAddress, userAgent, json_extract_scalar(requestParameters, '$.bucketName') as bucketName, json_extract_scalar(requestParameters, '$.key') as object, userIdentity.arn as userArn FROM s3_cloudtrail_events_db.cloudtrail_myawsexamplebucket_table WHERE eventName = 'PutObject' AND eventTime BETWEEN '2019-07-05T00:00:00Z' and '2019-07-06T00:00:00Z'
例 — 选择具有 GET 对象访问请求的所有事件,并仅输出 EventTime、EventSource、SourceIP、UserAgent、BucketName、Object 和 UserARN
SELECT eventTime, eventName, eventSource, sourceIpAddress, userAgent, json_extract_scalar(requestParameters, '$.bucketName') as bucketName, json_extract_scalar(requestParameters, '$.key') as object, userIdentity.arn as userArn FROM s3_cloudtrail_events_db.cloudtrail_myawsexamplebucket_table WHERE eventName = 'GetObject' AND eventTime BETWEEN '2019-07-05T00:00:00Z' and '2019-07-06T00:00:00Z'
例 – 选择在特定时间段内向存储桶发出的所有匿名请求者事件,并仅输出 EventTime、EventName、EventSource、SourceIP、UserAgent、BucketName、UserARN 和 AccountID
SELECT eventTime, eventName, eventSource, sourceIpAddress, userAgent, json_extract_scalar(requestParameters, '$.bucketName') as bucketName, userIdentity.arn as userArn, userIdentity.accountId FROM s3_cloudtrail_events_db.cloudtrail_myawsexamplebucket_table WHERE userIdentity.accountId = 'anonymous' AND eventTime BETWEEN '2019-07-05T00:00:00Z' and '2019-07-06T00:00:00Z'
注意
-
也可以使用这些查询示例进行安全监控。您可以查看意外或未经授权的 IP 地址/请求者的
PutObject或GetObject调用结果,以及确定向存储桶发出的任何匿名请求。 -
此查询仅从启用了日志记录的时间检索信息。
如果您使用的是 Amazon S3 服务器访问日志,请参阅 使用 Amazon S3 访问日志确定对象访问请求。