AWS Detection Engineering — Architecting Security Logging at Scale in AWS
A deep dive into designing, implementing, and optimizing security logging infrastructure for modern threat detection
Introduction
A while ago, I was called in to investigate a sophisticated breach at a mid-sized fintech company. What started as routine anomaly detection quickly revealed a months-long campaign involving credential harvesting, lateral movement, and data exfiltration. The attack itself was concerning, but what kept me awake at night was something else entirely: we were flying blind for most of it.
Their AWS environment generated hundreds of gigabytes of logs daily, yet critical security events were scattered across dozens of services, stored in incompatible formats, and often aged out before analysis. When we finally pieced together the timeline, it became clear that the attacker’s initial foothold (T1078 — Valid Accounts) could have been detected on day one if proper logging architecture had been in place.
This investigation reinforced a fundamental truth I’ve learned over seven years in cyber security: you can’t defend what you can’t see, and you can’t see what you don’t log properly.
Today, I want to share the lessons learned from that incident and dozens of others like it. We’ll build a comprehensive security logging architecture that can detect, contain, and investigate threats at cloud scale — all while keeping costs manageable and maintaining operational efficiency.
Why Security Logging is Your First Line of Defense
During that fintech investigation, I discovered the attacker had been present for 127 days. They moved laterally through seventeen EC2 instances, accessed three S3 buckets containing customer PII, and established persistence through multiple techniques including scheduled tasks (T1053.005) and account manipulation (T1098). Yet for 120 of those days, we had no visibility into their activities.
The reality is stark: without comprehensive logging, you’re not doing security — you’re playing security theater.
The Detection Gap
Consider this timeline from the investigation:
Day 1: Initial compromise via stolen credentials (T1078.004)
Day 15: Lateral movement to production systems (T1021.001)
Day 43: Data discovery and collection (T1083, T1119)
Day 89: Exfiltration begins (T1041)
Day 127: Discovery by external threat intel feed
Each of these phases generated log events that could have triggered alerts:
- Day 1: Unusual login patterns from new geographic locations
- Day 15: Abnormal process execution and network connections
- Day 43: Mass file enumeration and database queries
- Day 89: Large data transfers to external IPs
But those logs were either not collected, not correlated, or had already been purged by retention policies designed for cost optimization rather than security effectiveness.
Modern Threat Landscape Demands Modern Logging
Today’s attackers operate with patience and sophistication. They understand that most organizations have visibility gaps and exploit them ruthlessly. A solid logging architecture serves multiple critical functions:
1. Real-time threat detection— Identifying active threats as they unfold
2. Historical analysis — Understanding attack timelines and attribution
3. Compliance — Meeting regulatory requirements for audit trails
4. Operational intelligence — Supporting incident response and forensics
5. Threat hunting — Enabling proactive searches for unknown threats
Understanding AWS Logging Data Types
Before diving into architecture, let’s categorize the types of security-relevant data AWS generates. I’ve learned to think about logs in four distinct categories:
1. Administrative Logs
These capture management plane activities — the “who did what” of your AWS environment.
AWS CloudTrail is your primary source here. Every API call generates a detailed record:
{
"eventTime": "2024-01-15T10:23:45Z",
"eventName": "AssumeRole",
"eventSource": "sts.amazonaws.com",
"userIdentity": {
"type": "IAMUser",
"principalId": "AIDACKCEVSQ6C2EXAMPLE",
"arn": "arn:aws:iam::123456789012:user/security-analyst",
"accountId": "123456789012",
"userName": "security-analyst"
},
"sourceIPAddress": "203.0.113.12",
"userAgent": "aws-cli/2.0.55 Python/3.8.5",
"resources": [{
"accountId": "123456789012",
"type": "AWS::IAM::Role",
"ARN": "arn:aws:iam::123456789012:role/incident-response"
}]
}
This single log entry tells us exactly who assumed what role, when, and from where — critical for detecting privilege escalation (T1548) and unauthorized access attempts.
2. Data Access Logs
These track access to your data stores and are crucial for detecting data exfiltration.
S3 Access Logs provide granular visibility into bucket operations:
{
"bucket_owner": "123456789012",
"bucket": "prod-customer-data",
"time": "[15/Jan/2024:10:24:02 +0000]",
"remote_ip": "203.0.113.12",
"requester": "arn:aws:iam::123456789012:user/data-analyst",
"request_id": "3E57427F33A59F07",
"operation": "REST.GET.OBJECT",
"key": "customers/pii/customer_database_export.csv",
"http_status": "200",
"bytes_sent": "2847691"
}
VPC Flow Logs reveal network-level data movement:
2 123456789012 eni-1a2b3c4d 203.0.113.12 10.0.1.15 443 32768 6 25 5000 1642248245 1642248304 ACCEPT OK
3. Application Logs
These capture application-level events and user activities.
CloudWatch Logs from EC2 instances, Lambda functions, and containers provide deep visibility into application behavior. During my investigation, application logs revealed the attacker’s reconnaissance activities:
2024-01-15 10:25:33 INFO [RequestHandler] User: compromised-user accessed endpoint: /api/customers/list
2024-01-15 10:25:45 WARN [DatabaseHandler] Large result set returned: 15,847 records for query: SELECT * FROM customers
2024-01-15 10:26:12 INFO [RequestHandler] User: compromised-user accessed endpoint: /api/customers/export
4. Security Service Logs
AWS security services generate their own specialized logs.
GuardDuty findings provide machine learning-driven threat detection:
{
"schemaVersion": "2.0",
"accountId": "123456789012",
"region": "us-east-1",
"partition": "aws",
"id": "1eb5b861-0a5c-4b5b-9a1e-1234567890ab",
"arn": "arn:aws:guardduty:us-east-1:123456789012:detector/12abc34d567e8f901234567890123456/finding/1eb5b861-0a5c-4b5b-9a1e-1234567890ab",
"type": "UnauthorizedAPI:EC2/TorIPCaller",
"service": {
"serviceName": "guardduty",
"detectorId": "12abc34d567e8f901234567890123456",
"action": {
"actionType": "AWS_API_CALL",
"awsApiCallAction": {
"api": "DescribeInstances",
"serviceName": "ec2.amazonaws.com",
"callerType": "Remote IP"
}
}
}
}
AWS Log Sources for Security
Let me walk you through the critical log sources I configure in every AWS environment. Each serves a specific purpose in our overall detection strategy:
Core Infrastructure Logs
AWS CloudTrail — The foundation of AWS logging
- Purpose: API call auditing and compliance
- MITRE Coverage: T1078 (Valid Accounts), T1548 (Abuse Elevation Control), T1098 (Account Manipulation)
- Configuration: Multi-region, management and data events, immutable S3 storage
CloudTrail:
Type: AWS::CloudTrail::Trail
Properties:
TrailName: security-audit-trail
S3BucketName: !Ref SecurityLogsBucket
S3KeyPrefix: cloudtrail/
IncludeGlobalServiceEvents: true
IsMultiRegionTrail: true
EnableLogFileValidation: true
InsightSelyamectors:
- InsightType: ApiCallRateInsight
EventSelectors:
- ReadWriteType: All
IncludeManagementEvents: true
DataResources:
- Type: AWS::S3::Object
Values: ["arn:aws:s3:::prod-*/*"]
- Type: AWS::Lambda::Function
Values: ["arn:aws:lambda:*"]
VPC Flow Logs— Network visibility
- Purpose: Network traffic analysis and lateral movement detection
- MITRE Coverage: T1021 (Remote Services), T1041 (Exfiltration Over C2 Channel)
- Configuration: All interfaces, all traffic types, enriched metadata
VPCFlowLog:
Type: AWS::EC2::FlowLog
Properties:
ResourceType: VPC
ResourceId: !Ref ProductionVPC
TrafficType: ALL
LogDestinationType: cloud-watch-logs
LogGroupName: !Ref VPCFlowLogGroup
LogFormat: '${srcaddr} ${dstaddr} ${srcport} ${dstport} ${protocol} ${packets} ${bytes} ${windowstart} ${windowend} ${action} ${flowlogstatus} ${version} ${account-id} ${interface-id} ${log-status} ${vpc-id} ${subnet-id} ${instance-id} ${tcp-flags} ${type} ${pkt-srcaddr} ${pkt-dstaddr} ${region} ${az-id} ${sublocation-type} ${sublocation-id} ${pkt-src-aws-service} ${pkt-dst-aws-service} ${flow-direction} ${traffic-path}'
Data Protection Logs
S3 Access Logs and CloudTrail Data Events
- Purpose: Data access monitoring and DLP
- MITRE Coverage: T1530 (Data from Cloud Storage Object), T1119 (Automated Collection)
S3BucketLogging:
Type: AWS::S3::Bucket
Properties:
BucketName: prod-customer-data
LoggingConfiguration:
DestinationBucketName: !Ref SecurityLogsBucket
LogFilePrefix: s3-access-logs/
NotificationConfiguration:
CloudWatchConfigurations:
- Event: s3:ObjectCreated:*
CloudWatchConfiguration:
LogGroupName: !Ref S3AccessLogGroup
Application & Container Logs
CloudWatch Logs from EC2, ECS, EKS, and Lambda
- Purpose: Application-level threat detection
- MITRE Coverage: T1059 (Command and Scripting Interpreter), T1083 (File and Directory Discovery)
LogGroup:
Type: AWS::Logs::LogGroup
Properties:
LogGroupName: /aws/ec2/security-critical-apps
RetentionInDays: 90
LogGroupClass: STANDARD
LogStream:
Type: AWS::Logs::LogStream
Properties:
LogGroupName: !Ref LogGroup
LogStreamName: application-security-events
Security Service Logs
AWS GuardDuty, Security Hub, CloudWatch Events
- Purpose: Automated threat detection and SIEM integration
- MITRE Coverage: Multi-tactic coverage through ML-driven detection
Architectural Design: Building for Scale and Security
Based on my experience with environments ranging from startups to Fortune 500 companies, I’ve developed a reference architecture that balances security effectiveness with operational practicality.
High-Level Architecture Overview
System Components Deep Dive
Log Collectors
The collector layer is where logs enter our architecture. I’ve learned that having multiple collection methods is crucial for resilience. Here are the tools I recommend based on different use cases:
Tool Options:
- AWS CloudWatch Agent — Native AWS integration, lightweight
- Filebeat — Elastic Stack component, excellent for file-based logs
- Fluentd — Ruby-based, extensive plugin ecosystem
- Vector— Rust-based, high-performance log router
- rsyslog — Traditional syslog daemon with modern features
- Fluent Bit — Lightweight alternative to Fluentd
Primary Collector: CloudWatch Logs
CloudWatchAgent:
Type: AWS::SSM::Document
Properties:
DocumentType: Command
Content:
schemaVersion: "2.2"
description: "Configure CloudWatch Agent for Security Logging"
parameters:
config:
type: String
description: "CloudWatch Agent Configuration"
default: |
{
"agent": {
"metrics_collection_interval": 60,
"run_as_user": "cwagent"
},
"logs": {
"logs_collected": {
"files": {
"collect_list": [
{
"file_path": "/var/log/auth.log",
"log_group_name": "/aws/ec2/security/auth",
"log_stream_name": "{instance_id}",
"retention_in_days": 30
},
{
"file_path": "/var/log/secure",
"log_group_name": "/aws/ec2/security/secure",
"log_stream_name": "{instance_id}",
"retention_in_days": 30
},
{
"file_path": "/var/log/audit/audit.log",
"log_group_name": "/aws/ec2/security/audit",
"log_stream_name": "{instance_id}",
"retention_in_days": 90
}
]
}
}
}
}
Alternative Collector: Filebeat
For environments with mixed infrastructure or when you need more flexibility than CloudWatch Agent provides:
# filebeat.yml - Security-focused configuration
filebeat.inputs:
- type: log
enabled: true
paths:
- /var/log/auth.log
- /var/log/secure
- /var/log/audit/audit.log
fields:
log_type: security
environment: production
fields_under_root: false
multiline.pattern: '^[0-9]{4}-[0-9]{2}-[0-9]{2}'
multiline.negate: true
multiline.match: after
- type: log
enabled: true
paths:
- /var/log/nginx/access.log
- /var/log/apache2/access.log
fields:
log_type: web_access
json.keys_under_root: true
json.overwrite_keys: true
processors:
- add_host_metadata:
when.not.contains.tags: forwarded
- add_cloud_metadata: ~
- add_docker_metadata: ~
output.logstash:
hosts: ["logstash-cluster.security.internal:5044"]
loadbalance: true
# Alternative: Direct to Kinesis
output.kinesis:
endpoint: "kinesis.us-east-1.amazonaws.com"
region: "us-east-1"
stream_name: "security-logs-stream"
partition_key: "%{[fields.log_type]}"
codec.format:
string: '%{[@timestamp]} %{[message]}'
Secondary Collector: Direct S3 Integration
For high-volume logs like VPC Flow Logs, direct S3 integration reduces costs and latency:
FlowLogS3Integration:
Type: AWS::EC2::FlowLog
Properties:
ResourceType: VPC
ResourceId: !Ref ProductionVPC
TrafficType: ALL
LogDestinationType: s3
LogDestination: !Sub "${SecurityLogsBucket}/vpc-flow-logs/"
LogFormat: '${srcaddr} ${dstaddr} ${srcport} ${dstport} ${protocol} ${packets} ${bytes} ${start} ${end} ${action}'
MaxAggregationInterval: 60
Log Aggregators
The aggregation layer normalizes and enriches logs before storage. I prefer Kinesis Data Firehose for this role because of its managed nature and built-in transformations.
Tool Options:
- Amazon Kinesis Data Firehose — Managed AWS service with built-in transformations
- Logstash — Part of Elastic Stack, powerful parsing and transformation
- Fluentd — Data collector with rich plugin ecosystem
- Vector — High-performance observability data pipeline
- Apache NiFi — Visual dataflow programming for complex routing
- Fluent Bit — Lightweight log processor and forwarder
SecurityLogsFirehose:
Type: AWS::KinesisFirehose::DeliveryStream
Properties:
DeliveryStreamName: security-logs-stream
DeliveryStreamType: DirectPut
ExtendedS3DestinationConfiguration:
BucketARN: !GetAtt SecurityLogsBucket.Arn
BufferingHints:
SizeInMBs: 128
IntervalInSeconds: 60
CompressionFormat: GZIP
Prefix: "year=!{timestamp:yyyy}/month=!{timestamp:MM}/day=!{timestamp:dd}/hour=!{timestamp:HH}/"
ErrorOutputPrefix: "errors/"
ProcessingConfiguration:
Enabled: true
Processors:
- Type: Lambda
Parameters:
- ParameterName: LambdaArn
ParameterValue: !GetAtt LogEnrichmentFunction.Arn
CloudWatchLoggingOptions:
Enabled: true
LogGroupName: !Ref FirehoseLogGroup
The enrichment Lambda function adds critical context:
import json
import boto3
import base64
from datetime import datetime
def lambda_handler(event, context):
output = []
for record in event['records']:
# Decode the data
payload = base64.b64decode(record['data'])
data = json.loads(payload)
# Add enrichment fields
data['@timestamp'] = datetime.utcnow().isoformat()
data['source_account'] = context.invoked_function_arn.split(':')[4]
data['ingestion_time'] = datetime.utcnow().timestamp()
# Add MITRE ATT&CK mapping based on log type
if 'eventName' in data:
data['mitre_tactics'] = map_to_mitre(data['eventName'])
# Geo-enrich IP addresses
if 'sourceIPAddress' in data:
data['geo_info'] = get_ip_geolocation(data['sourceIPAddress'])
# Re-encode
output_record = {
'recordId': record['recordId'],
'result': 'Ok',
'data': base64.b64encode(
json.dumps(data).encode('utf-8')
).decode('utf-8')
}
output.append(output_record)
return {'records': output}
def map_to_mitre(event_name):
"""Map AWS API calls to MITRE ATT&CK tactics"""
mitre_mapping = {
'AssumeRole': ['TA0004'], # Privilege Escalation
'CreateUser': ['TA0003'], # Persistence
'PutBucketPolicy': ['TA0005'], # Defense Evasion
'DescribeInstances': ['TA0007'], # Discovery
'GetObject': ['TA0009'], # Collection
}
return mitre_mapping.get(event_name, [])
Alternative Aggregator: Logstash
For more complex parsing and transformation requirements, Logstash provides powerful filtering capabilities:
# logstash-security.conf
input {
beats {
port => 5044
}
kinesis {
kinesis_stream_name => "security-logs-stream"
region => "us-east-1"
codec => "json"
}
}
filter {
# Parse CloudTrail logs
if [log_type] == "cloudtrail" {
json {
source => "message"
}
# Extract geolocation from source IP
geoip {
source => "[sourceIPAddress]"
target => "geoip"
}
# Add MITRE ATT&CK mapping
if [eventName] {
mutate {
add_field => {
"mitre_tactics" => []
}
}
if [eventName] == "AssumeRole" {
mutate { add_field => { "[mitre_tactics][0]" => "TA0004" } }
}
if [eventName] == "CreateUser" {
mutate { add_field => { "[mitre_tactics][0]" => "TA0003" } }
}
}
}
# Parse VPC Flow Logs
if [log_type] == "vpc_flow" {
grok {
match => {
"message" => "%{DATA:version} %{DATA:account_id} %{DATA:interface_id} %{IP:srcaddr} %{IP:dstaddr} %{DATA:srcport} %{DATA:dstport} %{DATA:protocol} %{DATA:packets} %{DATA:bytes} %{DATA:windowstart} %{DATA:windowend} %{DATA:action} %{DATA:flowlogstatus}"
}
}
# Convert fields to appropriate types
mutate {
convert => {
"srcport" => "integer"
"dstport" => "integer"
"packets" => "integer"
"bytes" => "integer"
}
}
}
# Enrich with threat intelligence
if [srcaddr] {
elasticsearch {
hosts => ["threat-intel-cluster.security.internal:9200"]
index => "threat-intel"
query => "ip:%{srcaddr}"
fields => { "reputation" => "threat_reputation" }
}
}
# Add timestamp
date {
match => [ "eventTime", "ISO8601" ]
}
}
output {
# Primary output to Elasticsearch
elasticsearch {
hosts => ["security-logs-cluster.security.internal:9200"]
index => "security-logs-%{+YYYY.MM.dd}"
template_name => "security-logs"
template => "/etc/logstash/templates/security-logs.json"
}
# Secondary output to S3 for long-term storage
s3 {
region => "us-east-1"
bucket => "security-logs-archive"
prefix => "year=%{+YYYY}/month=%{+MM}/day=%{+dd}/hour=%{+HH}/"
codec => "json_lines"
time_file => 1
size_file => 100
}
# Real-time alerting to Kinesis
kinesis {
stream_name => "security-alerts-stream"
region => "us-east-1"
randomized_partition_key => true
}
}
Log Broker
For real-time processing and fan-out to multiple consumers, I implement Amazon MSK (Managed Streaming for Kafka):
Tool Options:
- Amazon MSK (Managed Streaming for Kafka) — Fully managed Apache Kafka service
- Apache Kafka — Self-managed high-throughput distributed streaming platform
- Amazon Kinesis Data Streams — Real-time data streaming service
- Redis Streams — Lightweight streaming with Redis
- Apache Pulsar— Cloud-native distributed messaging and streaming
- RabbitMQ — Message broker with advanced routing capabilities
MSKCluster:
Type: AWS::MSK::Cluster
Properties:
ClusterName: security-logs-cluster
KafkaVersion: 2.8.1
NumberOfBrokerNodes: 3
BrokerNodeGroupInfo:
InstanceType: kafka.m5.large
ClientSubnets:
- !Ref PrivateSubnet1
- !Ref PrivateSubnet2
- !Ref PrivateSubnet3
SecurityGroups:
- !Ref MSKSecurityGroup
StorageInfo:
EBSStorageInfo:
VolumeSize: 100
ConfigurationInfo:
Arn: !Ref MSKConfiguration
Revision: 1
EncryptionInfo:
EncryptionInTransit:
ClientBroker: TLS
InCluster: true
Alternative Broker: Self-Managed Apache Kafka
For organizations requiring more control or hybrid cloud deployments:
# kafka-security-cluster.properties
broker.id=1
listeners=SASL_SSL://0.0.0.0:9093
advertised.listeners=SASL_SSL://kafka-1.security.internal:9093
num.network.threads=8
num.io.threads=16
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
# Security Configuration
security.inter.broker.protocol=SASL_SSL
sasl.mechanism.inter.broker.protocol=PLAIN
sasl.enabled.mechanisms=PLAIN
# SSL Configuration
ssl.keystore.location=/opt/kafka/config/kafka.server.keystore.jks
ssl.keystore.password=${SSL_KEYSTORE_PASSWORD}
ssl.key.password=${SSL_KEY_PASSWORD}
ssl.truststore.location=/opt/kafka/config/kafka.server.truststore.jks
ssl.truststore.password=${SSL_TRUSTSTORE_PASSWORD}
# Topic Configuration for Security Logs
auto.create.topics.enable=false
default.replication.factor=3
min.insync.replicas=2
compression.type=gzip
# Retention for security logs
log.retention.hours=168 # 7 days for real-time processing
log.segment.bytes=1073741824 # 1GB segments
log.cleanup.policy=delete
# JVM Configuration for security workloads
export KAFKA_HEAP_OPTS="-Xmx6g -Xms6g"
export KAFKA_JVM_PERFORMANCE_OPTS="-server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35"
Kafka Topic Configuration for Security Events:
# Create security topics with appropriate partitioning
kafka-topics.sh --create \
--topic security-events-critical \
--partitions 12 \
--replication-factor 3 \
--config retention.ms=604800000 \
--config compression.type=gzip \
--config min.insync.replicas=2
kafka-topics.sh --create \
--topic security-events-high \
--partitions 6 \
--replication-factor 3 \
--config retention.ms=259200000 \
--config compression.type=gzip
kafka-topics.sh --create \
--topic security-events-medium \
--partitions 3 \
--replication-factor 3 \
--config retention.ms=86400000 \
--config compression.type=gzip
Alerting System
The alerting layer transforms log events into actionable security notifications, enabling rapid response to threats. I’ve learned that effective alerting requires both automated detection and intelligent noise reduction.
Tool Options:
- Amazon SNS — Simple notification service for basic alerting
- PagerDuty — Incident management with escalation policies
- Slack/Microsoft Teams — Collaborative alerting for security teams
- Splunk SOAR — Security orchestration and automated response
- Elasticsearch Watcher — Built-in alerting for Elasticsearch clusters
- Prometheus Alertmanager— Alert handling for metrics-based systems
- n8n — Low-code workflow automation for custom alerting and integrations
Primary Alerting: AWS SNS with Lambda Processing
# CloudFormation for intelligent alerting system
SecurityAlertingSystem:
Type: AWS::SNS::Topic
Properties:
TopicName: security-critical-alerts
KmsMasterKeyId: !Ref SecurityAlertsKMSKey
Subscription:
- Protocol: lambda
Endpoint: !GetAtt AlertProcessorFunction.Arn
- Protocol: email
Endpoint: soc-team@company.com
AlertProcessorFunction:
Type: AWS::Lambda::Function
Properties:
FunctionName: security-alert-processor
Runtime: python3.9
Handler: index.lambda_handler
Timeout: 30
Environment:
Variables:
SLACK_WEBHOOK_URL: !Ref SlackWebhookURL
PAGERDUTY_INTEGRATION_KEY: !Ref PagerDutyKey
THREAT_INTEL_API: !Ref ThreatIntelAPI
Code:
ZipFile: |
import json
import boto3
import requests
import hashlib
from datetime import datetime
def lambda_handler(event, context):
"""
Intelligent alert processing and enrichment
"""
for record in event['Records']:
message = json.loads(record['Sns']['Message'])
# Extract alert details
alert_type = message.get('alert_type', 'unknown')
severity = message.get('severity', 'medium')
source_ip = message.get('source_ip')
user_identity = message.get('user_identity', {})
# Enrich with threat intelligence
enriched_alert = enrich_alert(message)
# Apply alert fatigue reduction
if should_suppress_alert(enriched_alert):
continue
# Route based on severity and type
route_alert(enriched_alert)
return {'statusCode': 200}
def enrich_alert(alert):
"""Add context to security alerts"""
# Add geolocation
if alert.get('source_ip'):
geo_info = get_geolocation(alert['source_ip'])
alert['geo_location'] = geo_info
# Check threat intelligence
if alert.get('source_ip'):
threat_intel = check_threat_intel(alert['source_ip'])
alert['threat_intelligence'] = threat_intel
# Add user context
if alert.get('user_identity', {}).get('userName'):
user_context = get_user_context(alert['user_identity']['userName'])
alert['user_context'] = user_context
return alert
def should_suppress_alert(alert):
"""Implement alert fatigue reduction logic"""
# Suppress duplicate alerts within time window
alert_hash = generate_alert_hash(alert)
# Check DynamoDB for recent similar alerts
dynamodb = boto3.resource('dynamodb')
table = dynamodb.Table('security-alert-dedup')
response = table.get_item(Key={'alert_hash': alert_hash})
if 'Item' in response:
# Alert seen recently, suppress
return True
# Store alert hash for deduplication
table.put_item(
Item={
'alert_hash': alert_hash,
'timestamp': int(datetime.utcnow().timestamp()),
'ttl': int(datetime.utcnow().timestamp()) + 3600 # 1 hour TTL
}
)
return False
def route_alert(alert):
"""Route alerts based on severity and type"""
severity = alert.get('severity', 'medium')
if severity == 'critical':
# Page on-call engineer
send_pagerduty_alert(alert)
send_slack_alert(alert, urgent=True)
elif severity == 'high':
# Slack notification to security team
send_slack_alert(alert, urgent=False)
else:
# Email notification for medium/low
send_email_alert(alert)
# Alert deduplication table
AlertDeduplicationTable:
Type: AWS::DynamoDB::Table
Properties:
TableName: security-alert-dedup
BillingMode: PAY_PER_REQUEST
AttributeDefinitions:
- AttributeName: alert_hash
AttributeType: S
KeySchema:
- AttributeName: alert_hash
KeyType: HASH
TimeToLiveSpecification:
AttributeName: ttl
Enabled: true
Alternative Alerting: Elasticsearch Watcher
For Elasticsearch-based logging architectures, Watcher provides sophisticated alerting capabilities:
{
"trigger": {
"schedule": {
"interval": "1m"
}
},
"input": {
"search": {
"request": {
"search_type": "query_then_fetch",
"indices": ["security-logs-*"],
"body": {
"query": {
"bool": {
"must": [
{
"range": {
"@timestamp": {
"gte": "now-1m"
}
}
},
{
"terms": {
"eventName": [
"AssumeRole",
"CreateUser",
"AttachUserPolicy",
"PutBucketPolicy"
]
}
}
],
"should": [
{
"term": {
"threat_intelligence.reputation": "malicious"
}
},
{
"range": {
"geoip.location": {
"lat": 35,
"lon": -95,
"distance": "100km"
}
}
}
],
"minimum_should_match": 1
}
},
"aggregations": {
"by_user": {
"terms": {
"field": "userIdentity.userName.keyword",
"size": 10
},
"aggregations": {
"by_source_ip": {
"terms": {
"field": "sourceIPAddress.keyword",
"size": 5
}
}
}
}
}
}
}
}
},
"condition": {
"compare": {
"ctx.payload.hits.total": {
"gt": 5
}
}
},
"actions": {
"send_slack_alert": {
"webhook": {
"scheme": "https",
"host": "hooks.slack.com",
"port": 443,
"method": "post",
"path": "/services/YOUR/SLACK/WEBHOOK",
"params": {},
"headers": {
"Content-type": "application/json"
},
"body": """
{
"channel": "#security-alerts",
"username": "SecurityBot",
"text": "🚨 CRITICAL: Suspicious privilege escalation detected",
"attachments": [
{
"color": "danger",
"fields": [
{
"title": "Event Count",
"value": "{{ctx.payload.hits.total}}",
"short": true
},
{
"title": "Time Range",
"value": "Last 1 minute",
"short": true
},
{
"title": "Top Users",
"value": "{{#ctx.payload.aggregations.by_user.buckets}}{{key}} ({{doc_count}})\n{{/ctx.payload.aggregations.by_user.buckets}}",
"short": false
}
]
}
]
}
"""
}
},
"create_jira_ticket": {
"webhook": {
"scheme": "https",
"host": "company.atlassian.net",
"port": 443,
"method": "post",
"path": "/rest/api/2/issue",
"params": {},
"headers": {
"Authorization": "Basic {{ctx.metadata.jira_auth}}",
"Content-type": "application/json"
},
"body": """
{
"fields": {
"project": {"key": "SEC"},
"summary": "Automated Security Alert: Privilege Escalation Detected",
"description": "Alert triggered by Elasticsearch Watcher\n\nEvent Count: {{ctx.payload.hits.total}}\nTime Range: {{ctx.execution_time}}\n\nInvestigation required for potential security incident.",
"issuetype": {"name": "Bug"},
"priority": {"name": "Critical"},
"labels": ["security", "automated", "privilege-escalation"]
}
}
"""
}
}
}
}
Advanced Alerting: SOAR Integration
For enterprise environments requiring automated response capabilities:
# soar_security_playbook.py
import boto3
import requests
import json
from typing import Dict, List, Any
class SecurityPlaybook:
def __init__(self):
self.ec2 = boto3.client('ec2')
self.iam = boto3.client('iam')
self.guardduty = boto3.client('guardduty')
def handle_compromised_instance(self, alert: Dict[str, Any]) -> Dict[str, Any]:
"""
Automated response to compromised EC2 instance
MITRE: T1078, T1021 (Valid Accounts, Remote Services)
"""
instance_id = alert.get('instance_id')
if not instance_id:
return {'status': 'error', 'message': 'No instance ID provided'}
actions_taken = []
# 1. Isolate the instance
isolation_result = self.isolate_instance(instance_id)
actions_taken.append(isolation_result)
# 2. Create forensic snapshot
snapshot_result = self.create_forensic_snapshot(instance_id)
actions_taken.append(snapshot_result)
# 3. Collect evidence
evidence_result = self.collect_instance_evidence(instance_id)
actions_taken.append(evidence_result)
# 4. Notify incident response team
notification_result = self.notify_incident_team(alert, actions_taken)
return {
'status': 'success',
'actions_taken': actions_taken,
'incident_id': notification_result.get('incident_id')
}
def isolate_instance(self, instance_id: str) -> Dict[str, Any]:
"""Create forensic security group and apply to instance"""
try:
# Create isolation security group
vpc_response = self.ec2.describe_instances(InstanceIds=[instance_id])
vpc_id = vpc_response['Reservations'][0]['Instances'][0]['VpcId']
isolation_sg = self.ec2.create_security_group(
GroupName=f'forensic-isolation-{instance_id}',
Description=f'Forensic isolation for {instance_id}',
VpcId=vpc_id
)
# Apply isolation security group
self.ec2.modify_instance_attribute(
InstanceId=instance_id,
Groups=[isolation_sg['GroupId']]
)
return {
'action': 'isolate_instance',
'status': 'success',
'security_group_id': isolation_sg['GroupId']
}
except Exception as e:
return {
'action': 'isolate_instance',
'status': 'error',
'error': str(e)
}
def create_forensic_snapshot(self, instance_id: str) -> Dict[str, Any]:
"""Create EBS snapshots for forensic analysis"""
try:
# Get instance volumes
response = self.ec2.describe_instances(InstanceIds=[instance_id])
instance = response['Reservations'][0]['Instances'][0]
snapshots = []
for bdm in instance.get('BlockDeviceMappings', []):
volume_id = bdm['Ebs']['VolumeId']
snapshot = self.ec2.create_snapshot(
VolumeId=volume_id,
Description=f'Forensic snapshot of {volume_id} from {instance_id}'
)
# Tag snapshot for tracking
self.ec2.create_tags(
Resources=[snapshot['SnapshotId']],
Tags=[
{'Key': 'Purpose', 'Value': 'Forensic'},
{'Key': 'SourceInstance', 'Value': instance_id},
{'Key': 'CreatedBy', 'Value': 'SecurityPlaybook'},
{'Key': 'Incident', 'Value': f'auto-{instance_id}'}
]
)
snapshots.append(snapshot['SnapshotId'])
return {
'action': 'create_forensic_snapshot',
'status': 'success',
'snapshots': snapshots
}
except Exception as e:
return {
'action': 'create_forensic_snapshot',
'status': 'error',
'error': str(e)
}
def notify_incident_team(self, alert: Dict[str, Any], actions: List[Dict]) -> Dict[str, Any]:
"""Create incident ticket and notify team"""
try:
# Create ServiceNow incident (example)
incident_data = {
'short_description': f"Automated Security Response: {alert.get('alert_type', 'Unknown')}",
'description': f"""
Security alert triggered automated response:
Alert Details:
- Type: {alert.get('alert_type')}
- Severity: {alert.get('severity')}
- Source IP: {alert.get('source_ip')}
- Instance: {alert.get('instance_id')}
- Time: {alert.get('timestamp')}
Automated Actions Taken:
{json.dumps(actions, indent=2)}
Incident requires manual investigation and response.
""",
'urgency': '1' if alert.get('severity') == 'critical' else '2',
'impact': '2',
'category': 'Security',
'subcategory': 'Intrusion'
}
# Submit to incident management system
incident_response = requests.post(
'https://company.service-now.com/api/now/table/incident',
headers={
'Authorization': 'Bearer YOUR_TOKEN',
'Content-Type': 'application/json'
},
json=incident_data
)
return {
'action': 'notify_incident_team',
'status': 'success',
'incident_id': incident_response.json().get('result', {}).get('number')
}
except Exception as e:
return {
'action': 'notify_incident_team',
'status': 'error',
'error': str(e)
}
# Lambda handler for SOAR integration
def lambda_handler(event, context):
playbook = SecurityPlaybook()
for record in event['Records']:
alert = json.loads(record['body'])
alert_type = alert.get('alert_type')
if alert_type == 'compromised_instance':
result = playbook.handle_compromised_instance(alert)
elif alert_type == 'credential_compromise':
result = playbook.handle_credential_compromise(alert)
elif alert_type == 'data_exfiltration':
result = playbook.handle_data_exfiltration(alert)
else:
result = {'status': 'no_playbook', 'alert_type': alert_type}
print(f"Playbook execution result: {json.dumps(result)}")
return {'statusCode': 200}
Storage Strategy
I implement a tiered storage approach that balances performance with cost:
- Hot Tier (0–30 days): S3 Standard for active investigation
- Warm Tier (31–90 days): S3 Standard-IA for recent historical analysis
- Cold Tier (91–2555 days): S3 Glacier Flexible Retrieval for compliance
- Archive Tier (7+ years): S3 Glacier Deep Archive for long-term retention
Tool Options by Use Case:
Search & Analytics:
- Amazon OpenSearch — Managed Elasticsearch with security analytics
- Elasticsearch — Self-managed search and analytics engine
- Splunk — Enterprise SIEM and log analysis platform
- Apache Solr— Open-source enterprise search platform
- Metabase — Open-source BI tool for interactive dashboards and ad-hoc queries on security data.
Object Storage:
- Amazon S3— Scalable object storage with lifecycle policies
- MinIO — S3-compatible object storage for hybrid deployments
- Azure Blob Storage — Microsoft’s object storage solution
- Google Cloud Storage — Google’s object storage service
Data Warehousing:
- Amazon Redshift— Managed data warehouse for complex analytics
- Snowflake — Cloud data platform with security features
- Google BigQuery— Serverless data warehouse for large-scale analysis
- Apache Iceberg— Open table format for analytics
SecurityLogsBucket:
Type: AWS::S3::Bucket
Properties:
BucketName: !Sub "${AWS::StackName}-security-logs-${AWS::AccountId}"
LifecycleConfiguration:
Rules:
- Id: SecurityLogsLifecycle
Status: Enabled
Transitions:
- TransitionInDays: 30
StorageClass: STANDARD_IA
- TransitionInDays: 90
StorageClass: GLACIER
- TransitionInDays: 2555 # 7 years
StorageClass: DEEP_ARCHIVE
VersioningConfiguration:
Status: Enabled
PublicAccessBlockConfiguration:
BlockPublicAcls: true
BlockPublicPolicy: true
IgnorePublicAcls: true
RestrictPublicBuckets: true
NotificationConfiguration:
LambdaConfigurations:
- Event: s3:ObjectCreated:*
Function: !GetAtt LogProcessorFunction.Arn
Filter:
S3Key:
Rules:
- Name: prefix
Value: security-logs/
Alternative Storage: Elasticsearch Cluster
For organizations requiring real-time search and complex analytics:
# elasticsearch-security-cluster.yml
cluster.name: security-logs-cluster
node.name: security-node-${NODE_ID}
node.roles: [ master, data, ingest ]
# Network configuration
network.host: 0.0.0.0
http.port: 9200
transport.port: 9300
# Discovery configuration for multi-node cluster
discovery.seed_hosts:
- security-node-1.internal
- security-node-2.internal
- security-node-3.internal
cluster.initial_master_nodes:
- security-node-1
- security-node-2
- security-node-3
# Security configuration
xpack.security.enabled: true
xpack.security.transport.ssl.enabled: true
xpack.security.http.ssl.enabled: true
xpack.security.authc.api_key.enabled: true
# Index lifecycle management for security logs
xpack.ilm.enabled: true
# Memory and performance settings for security workloads
bootstrap.memory_lock: true
indices.memory.index_buffer_size: 20%
indices.memory.min_index_buffer_size: 48mb
# Security-specific index templates
indices.lifecycle.rollover.only_if_has_documents: false
Index Lifecycle Policy for Security Logs:
{
"policy": {
"phases": {
"hot": {
"actions": {
"rollover": {
"max_size": "10GB",
"max_age": "1d"
},
"set_priority": {
"priority": 100
}
}
},
"warm": {
"min_age": "7d",
"actions": {
"shrink": {
"number_of_shards": 1
},
"forcemerge": {
"max_num_segments": 1
},
"allocate": {
"number_of_replicas": 0
},
"set_priority": {
"priority": 50
}
}
},
"cold": {
"min_age": "30d",
"actions": {
"allocate": {
"include": {
"box_type": "cold"
}
},
"set_priority": {
"priority": 0
}
}
},
"delete": {
"min_age": "365d"
}
}
}
}
Cross Region & Cross Account
At enterprise scale, a single-region approach won’t suffice. Here’s my multi-region, multi-account architecture:
# Cross-region replication for disaster recovery
ReplicationConfiguration:
Type: AWS::S3::Bucket
Properties:
ReplicationConfiguration:
Role: !GetAtt ReplicationRole.Arn
Rules:
- Id: ReplicateSecurityLogs
Status: Enabled
Prefix: security-logs/
Destination:
Bucket: !Sub "arn:aws:s3:::${BackupBucketName}"
StorageClass: STANDARD_IA
# Cross-account log consolidation
CrossAccountPolicy:
Type: AWS::S3::BucketPolicy
Properties:
Bucket: !Ref SecurityLogsBucket
PolicyDocument:
Statement:
- Sid: AllowCrossAccountLogDelivery
Effect: Allow
Principal:
AWS:
- "arn:aws:iam::111122223333:root" # Production account
- "arn:aws:iam::444455556666:root" # Development account
Action:
- s3:PutObject
- s3:GetBucketAcl
Resource:
- !Sub "${SecurityLogsBucket}/*"
- !Ref SecurityLogsBucket
Condition:
StringEquals:
's3:x-amz-acl': 'bucket-owner-full-control'
Security Logging Strategy
After analyzing hundreds of incidents, I’ve developed a strategic approach to logging that focuses on detection efficacy rather than volume.
Priority-Based Log Collection
Critical (Real-time processing):
- Authentication events (CloudTrail IAM actions)
- Privilege escalation attempts
- Data access to sensitive resources
- Network connections to known bad IPs
High (Near real-time, < 5 minutes):
- Administrative actions
- Security group changes
- S3 bucket policy modifications
- GuardDuty findings
Medium (Batch processing, < 1 hour):
- VPC Flow Logs
- Application logs
- Database audit logs
- DNS queries
Low (Daily processing):
- Configuration snapshots
- Billing logs
- Resource inventory
Detection Rule Framework
I organize detection rules around the MITRE ATT&CK framework. Here’s an example rule for detecting credential dumping (T1003):
-- Detect potential credential dumping via suspicious process execution
SELECT
timestamp,
instance_id,
user_name,
process_name,
command_line,
parent_process,
COUNT(*) as event_count
FROM aws_cloudwatch_logs
WHERE
timestamp >= NOW() - INTERVAL '1 HOUR'
AND (
process_name LIKE '%mimikatz%' OR
process_name LIKE '%procdump%' OR
command_line LIKE '%lsass%' OR
command_line LIKE '%sam%' OR
command_line LIKE '%ntds.dit%'
)
AND NOT (
-- Exclude known good processes
user_name = 'backup-service' AND
process_name = 'ntdsutil.exe'
)
GROUP BY timestamp, instance_id, user_name, process_name, command_line, parent_process
HAVING COUNT(*) >= 3
ORDER BY timestamp DESC;
Lateral Movement Detection (T1021.001 — Remote Desktop Protocol):
-- Detect unusual RDP connections indicating lateral movement
WITH rdp_sessions AS (
SELECT
source_ip,
destination_ip,
timestamp,
ROW_NUMBER() OVER (
PARTITION BY source_ip
ORDER BY timestamp
) as session_sequence
FROM vpc_flow_logs
WHERE
destination_port = 3389
AND action = 'ACCEPT'
AND timestamp >= NOW() - INTERVAL '24 HOURS'
),
lateral_movement AS (
SELECT
r1.source_ip,
r1.destination_ip as first_target,
r2.destination_ip as second_target,
r1.timestamp as first_connection,
r2.timestamp as second_connection,
TIMESTAMPDIFF(MINUTE, r1.timestamp, r2.timestamp) as time_diff
FROM rdp_sessions r1
JOIN rdp_sessions r2 ON r1.source_ip = r2.source_ip
WHERE
r1.session_sequence = 1
AND r2.session_sequence = 2
AND r1.destination_ip != r2.destination_ip
AND TIMESTAMPDIFF(MINUTE, r1.timestamp, r2.timestamp) <= 60
)
SELECT
source_ip,
first_target,
second_target,
first_connection,
second_connection,
time_diff
FROM lateral_movement
ORDER BY first_connection DESC;
Data Exfiltration Detection (T1041 — Exfiltration Over C2 Channel):
-- Detect large data transfers to external IPs
WITH baseline_traffic AS (
SELECT
source_ip,
AVG(bytes) as avg_bytes,
STDDEV(bytes) as stddev_bytes
FROM vpc_flow_logs
WHERE
timestamp >= NOW() - INTERVAL '30 DAYS'
AND timestamp < NOW() - INTERVAL '1 DAY'
GROUP BY source_ip
),
recent_traffic AS (
SELECT
source_ip,
destination_ip,
SUM(bytes) as total_bytes,
COUNT(*) as connection_count,
timestamp
FROM vpc_flow_logs
WHERE
timestamp >= NOW() - INTERVAL '1 HOUR'
AND action = 'ACCEPT'
-- External IPs only
AND NOT (
destination_ip LIKE '10.%' OR
destination_ip LIKE '172.16.%' OR
destination_ip LIKE '172.17.%' OR
destination_ip LIKE '172.18.%' OR
destination_ip LIKE '172.19.%' OR
destination_ip LIKE '172.20.%' OR
destination_ip LIKE '172.21.%' OR
destination_ip LIKE '172.22.%' OR
destination_ip LIKE '172.23.%' OR
destination_ip LIKE '172.24.%' OR
destination_ip LIKE '172.25.%' OR
destination_ip LIKE '172.26.%' OR
destination_ip LIKE '172.27.%' OR
destination_ip LIKE '172.28.%' OR
destination_ip LIKE '172.29.%' OR
destination_ip LIKE '172.30.%' OR
destination_ip LIKE '172.31.%' OR
destination_ip LIKE '192.168.%'
)
GROUP BY source_ip, destination_ip, timestamp
)
SELECT
rt.source_ip,
rt.destination_ip,
rt.total_bytes,
rt.connection_count,
rt.timestamp,
bt.avg_bytes,
(rt.total_bytes - bt.avg_bytes) / NULLIF(bt.stddev_bytes, 0) as z_score
FROM recent_traffic rt
LEFT JOIN baseline_traffic bt ON rt.source_ip = bt.source_ip
WHERE
rt.total_bytes > 100000000 -- 100MB threshold
AND (
bt.avg_bytes IS NULL OR -- New source IP
(rt.total_bytes - bt.avg_bytes) / NULLIF(bt.stddev_bytes, 0) > 3 -- 3 sigma deviation
)
ORDER BY rt.total_bytes DESC;
Cost Optimization in Storage
During my time managing security logging for a large e-commerce platform, we learned that uncontrolled log growth could quickly consume entire security budgets. Here’s how we reduced costs by 60% while improving detection capabilities:
Intelligent Log Sampling
Not all logs need to be stored at 100% volume. I implement sampling based on risk and value:
import random
import json
from datetime import datetime
def intelligent_sampling(log_event):
"""
Implement risk-based sampling to reduce storage costs
"""
event_type = log_event.get('eventName', '')
source_ip = log_event.get('sourceIPAddress', '')
user_agent = log_event.get('userAgent', '')
# Always keep high-risk events
high_risk_events = [
'AssumeRole', 'CreateUser', 'AttachUserPolicy',
'PutBucketPolicy', 'CreateAccessKey', 'DeleteTrail'
]
if event_type in high_risk_events:
return True
# Keep all events from suspicious IPs
if is_suspicious_ip(source_ip):
return True
# Sample based on event frequency
sampling_rates = {
'DescribeInstances': 0.1, # Keep 10%
'ListBuckets': 0.05, # Keep 5%
'GetObject': 0.01, # Keep 1%
'default': 0.2 # Keep 20% of other events
}
rate = sampling_rates.get(event_type, sampling_rates['default'])
return random.random() < rate
def is_suspicious_ip(ip_address):
"""Check if IP is on threat intelligence feeds"""
# Implementation would check against threat intel APIs
suspicious_networks = [
'203.0.113.0/24', # Documentation range used for examples
'198.51.100.0/24' # Another documentation range
]
return any(ip_in_network(ip_address, network) for network in suspicious_networks)
Log Compression and Deduplication
Implement compression at multiple levels:
# S3 bucket with compression
LogsBucketCompressed:
Type: AWS::S3::Bucket
Properties:
BucketName: !Sub "${AWS::StackName}-compressed-logs"
LifecycleConfiguration:
Rules:
- Id: CompressAndTransition
Status: Enabled
Filter:
Prefix: "raw-logs/"
Transitions:
- TransitionInDays: 1
StorageClass: STANDARD_IA
NoncurrentVersionTransitions:
- TransitionInDays: 30
StorageClass: GLACIER
# Lambda function for log deduplication
DeduplicationFunction:
Type: AWS::Lambda::Function
Properties:
FunctionName: log-deduplication
Runtime: python3.9
Handler: index.lambda_handler
Code:
ZipFile: |
import json
import gzip
import hashlib
from collections import defaultdict
def lambda_handler(event, context):
deduped_logs = defaultdict(int)
for record in event['Records']:
# Process each log record
log_data = json.loads(record['body'])
# Create hash of relevant fields (exclude timestamp)
relevant_fields = {
'eventName': log_data.get('eventName'),
'sourceIPAddress': log_data.get('sourceIPAddress'),
'userIdentity': log_data.get('userIdentity', {}).get('userName')
}
event_hash = hashlib.md5(
json.dumps(relevant_fields, sort_keys=True).encode()
).hexdigest()
deduped_logs[event_hash] += 1
# Store deduplicated logs with counts
return {
'statusCode': 200,
'body': json.dumps(dict(deduped_logs))
}
Cost Monitoring and Alerting
# CloudWatch alarm for storage costs
StorageCostAlarm:
Type: AWS::CloudWatch::Alarm
Properties:
AlarmName: SecurityLogsStorageCostAlarm
AlarmDescription: Monitor security logs storage costs
MetricName: EstimatedCharges
Namespace: AWS/Billing
Statistic: Maximum
Period: 86400 # 24 hours
EvaluationPeriods: 1
Threshold: 1000 # $1000/day
ComparisonOperator: GreaterThanThreshold
Dimensions:
- Name: Currency
Value: USD
- Name: ServiceName
Value: AmazonS3
AlarmActions:
- !Ref SecurityCostNotification
# SNS topic for cost notifications
SecurityCostNotification:
Type: AWS::SNS::Topic
Properties:
TopicName: security-logging-cost-alerts
Subscription:
- Protocol: email
Endpoint: security-team@company.com
Lessons in Logging: What I’ve Learned
After seven years and hundreds of investigations, here are the critical lessons that separate effective security logging from mere compliance theater:
Lesson 1: Log What Matters, Not Everything
Early in my career, I made the mistake of thinking “more logs = better security.” This led to a many logging environment that was completely unmanageable. The signal-to-noise ratio was so poor that real threats were drowning in benign events.
What changed my approach: During a critical incident, we had logs showing an attacker’s presence, but our SIEM was generating ~500 alerts per day. The real alerts were buried in false positives.
Key insight: Focus on logs that directly support your detection use cases. If you can’t articulate how a log type supports threat detection, incident response, or compliance requirements, don’t collect it.
Lesson 2: Context is King
Raw logs are just data points. The magic happens when you correlate them with context:
-- Example: Enriching authentication logs with user context
SELECT
ct.eventTime,
ct.sourceIPAddress,
ct.userIdentity.userName,
ct.userIdentity.arn,
gd.finding_type,
threat_intel.reputation,
user_baseline.normal_locations,
CASE
WHEN ct.sourceIPAddress NOT IN (user_baseline.normal_locations)
THEN 'anomalous_location'
ELSE 'normal_location'
END as location_assessment
FROM cloudtrail_events ct
LEFT JOIN guardduty_findings gd ON ct.sourceIPAddress = gd.remote_ip
LEFT JOIN threat_intelligence threat_intel ON ct.sourceIPAddress = threat_intel.ip
LEFT JOIN user_behavioral_baseline user_baseline ON ct.userIdentity.userName = user_baseline.username
WHERE
ct.eventName IN ('ConsoleLogin', 'AssumeRole')
AND ct.eventTime >= NOW() - INTERVAL '1 HOUR'
ORDER BY ct.eventTime DESC;
Lesson 3: Automate Everything (But Verify Everything Too)
Automation is essential for handling log volume, but I’ve learned to build verification into every automated process:
class LogQualityMonitor:
def __init__(self):
self.expected_log_sources = [
'cloudtrail', 'vpc-flow-logs', 'guardduty',
'application-logs', 's3-access-logs'
]
self.quality_thresholds = {
'missing_fields_percentage': 5.0,
'duplicate_events_percentage': 2.0,
'late_arrival_percentage': 10.0
}
def check_log_quality(self, time_window_minutes=60):
"""Monitor log quality and alert on issues"""
quality_report = {}
for source in self.expected_log_sources:
metrics = self.analyze_log_source(source, time_window_minutes)
quality_report[source] = metrics
# Check for quality issues
if metrics['missing_fields_pct'] > self.quality_thresholds['missing_fields_percentage']:
self.alert_quality_issue(source, 'missing_fields', metrics)
if metrics['expected_volume'] == 0:
self.alert_missing_logs(source)
return quality_report
def analyze_log_source(self, source, time_window):
"""Analyze individual log source quality"""
# Implementation would check CloudWatch Logs
# for volume, field completeness, timing, etc.
pass
Lesson 4: Plan for Scale from Day One
I’ve seen too many organizations hit logging “walls” where their architecture couldn’t scale with growth. Build for 10x your current volume:
# Auto-scaling Kinesis configuration
KinesisStream:
Type: AWS::Kinesis::Stream
Properties:
Name: security-logs-stream
StreamModeDetails:
StreamMode: ON_DEMAND # Automatically scales
RetentionPeriodHours: 168 # 7 days for replay capability
# Lambda with reserved concurrency for log processing
LogProcessorFunction:
Type: AWS::Lambda::Function
Properties:
FunctionName: security-log-processor
Runtime: python3.9
ReservedConcurrencyLimit: 100 # Prevent resource exhaustion
DeadLetterQueue:
TargetArn: !GetAtt FailedLogsDLQ.Arn
Environment:
Variables:
BATCH_SIZE: 100
MAX_RETRIES: 3
Lesson 5: Test Your Logging (Like Your Backups)
Logging systems fail silently. I implement continuous testing:
import boto3
import json
from datetime import datetime, timedelta
class LoggingHealthChecker:
def __init__(self):
self.cloudtrail = boto3.client('cloudtrail')
self.logs = boto3.client('logs')
def test_end_to_end_logging(self):
"""Test that logs flow from source to destination"""
# Generate a test event
test_event_id = f"test-{datetime.utcnow().strftime('%Y%m%d%H%M%S')}"
# Create a test S3 bucket operation
s3 = boto3.client('s3')
try:
s3.head_bucket(Bucket=f'non-existent-bucket-{test_event_id}')
except:
pass # Expected to fail, but generates CloudTrail log
# Wait and check if the event appears in logs
time.sleep(30)
# Query CloudWatch Logs for our test event
response = self.logs.filter_log_events(
logGroupName='/aws/cloudtrail',
startTime=int((datetime.utcnow() - timedelta(minutes=5)).timestamp() * 1000),
filterPattern=f'{{ $.eventName = "HeadBucket" && $.errorCode = "NoSuchBucket" }}'
)
test_events = [
event for event in response['events']
if test_event_id in event['message']
]
if not test_events:
self.alert_logging_failure("CloudTrail test event not found")
return False
return True
Conclusion: Building Security That Scales
As I wrap up this deep dive into AWS security logging architecture, I want to return to that fintech investigation I mentioned at the beginning. The 127-day dwell time wasn’t just a security failure — it was an architecture failure.
We had all the pieces: CloudTrail was enabled, VPC Flow Logs were configured, GuardDuty was running. But these systems were operating in isolation, generating massive volumes of data that no human could meaningfully analyze. We had logs, but we didn’t have visibility.
The architecture I’ve outlined here isn’t just about collecting more data — it’s about collecting the right data, processing it intelligently, and ensuring it serves your actual security needs. Every component, from the initial collectors to the final storage tiers, should support your detection and response capabilities.
Key Takeaways for Implementation
- Start with Use Cases: Before configuring any log source, define exactly what threats you’re trying to detect. Map your logging strategy to MITRE ATT&CK techniques relevant to your environment.
- Build for Scale: Your logging architecture will need to handle 10x growth. Use managed services like Kinesis and MSK that can scale automatically.
- Optimize for Cost: Implement intelligent sampling, compression, and lifecycle policies from day one. Monitor costs as closely as you monitor security events.
- Test Continuously: Log quality degrades over time. Build automated testing into your architecture to catch failures before they impact investigations.
- Plan for the Long Term: Consider not just current storage needs, but long-term compliance requirements and the eventual need to search historical data during major incidents.
The Path Forward
Modern threat actors are patient, sophisticated, and persistent. They count on organizations having visibility gaps and exploit them ruthlessly. By implementing comprehensive, scalable security logging, we deny them the darkness they need to operate.
The architecture I’ve shared here has been battle-tested across organizations from startups to Fortune 500 companies. It’s not perfect — no architecture is — but it provides the visibility foundation that effective security programs require.
Your logging infrastructure is your security team’s eyes and ears. Invest on it accordingly.
This post represents my personal experiences and opinions. All examples use dummy data.
Reference
- AWS Security Logging Best Practices
- https://attack.mitre.org/
- https://docs.aws.amazon.com/wellarchitected/latest/security-pillar/
- https://www.nist.gov/cyberframework
- https://s0cm0nkey.gitbook.io/s0cm0nkeys-security-reference-guide/security-logging
- https://blog.marcolancini.it/2021/blog-security-logging-cloud-environments-aws/
Thank you for being a part of the community
Before you go:
- Be sure to clap and follow the writer ️👏️️
- Follow us: X | LinkedIn | YouTube | Newsletter | Podcast | Twitch
- Start your own free AI-powered blog on Differ 🚀
- Join our content creators community on Discord 🧑🏻💻
- For more content, visit plainenglish.io + stackademic.com