Cloud Security Posture Management
Introduction
Cloud Security Posture Management (CSPM) continuously monitors cloud environments for misconfigurations, compliance violations, and security risks. As cloud infrastructure grows in complexity, manual security reviews become impossible. CSPM automates the detection and remediation of configuration issues that lead to most cloud data breaches.
Core CSPM Capabilities
CSPM tools provide automated discovery, assessment, and remediation across cloud services.
Multi-Cloud Visibility
import boto3
from google.cloud import resource_manager
from azure.mgmt.resource import ResourceManagementClient
class CSPMScanner:
def __init__(self):
self.aws = boto3.client('config')
self.gcp = resource_manager.Client()
self.findings = []
def scan_aws(self, account_id):
"""Evaluate AWS account against security benchmarks."""
# Check S3 public access
s3 = boto3.client('s3')
buckets = s3.list_buckets()['Buckets']
for bucket in buckets:
try:
acl = s3.get_public_access_block(Bucket=bucket['Name'])
block_config = acl['PublicAccessBlockConfiguration']
if not all([
block_config['BlockPublicAcls'],
block_config['BlockPublicPolicy'],
block_config['IgnorePublicAcls'],
block_config['RestrictPublicBuckets'],
]):
self.findings.append({
'severity': 'HIGH',
'service': 'S3',
'resource': bucket['Name'],
'issue': 'Public access not fully blocked',
'framework': 'CIS AWS 2.1'
})
except:
self.findings.append({
'severity': 'CRITICAL',
'service': 'S3',
'resource': bucket['Name'],
'issue': 'Public access block not configured',
'framework': 'CIS AWS 2.1'
})
Automated Checks
cspm_checks:
aws:
- id: "CIS-1.1"
title: "Avoid root user usage"
severity: critical
- id: "CIS-2.1"
title: "S3 buckets should have public access blocked"
severity: high
- id: "CIS-3.1"
title: "Ensure CloudTrail is enabled"
severity: high
- id: "CIS-4.1"
title: "Ensure no security groups allow SSH from 0.0.0.0/0"
severity: critical
gcp:
- id: "CIS-1.1"
title: "Ensure IAM users are managed centrally"
severity: high
- id: "CIS-2.2"
title: "Ensure default network does not exist"
severity: medium
- id: "CIS-3.3"
title: "Ensure VPC flow logging is enabled"
severity: medium
Compliance Monitoring
CSPM maps security findings to compliance frameworks, enabling automated audit evidence collection.
class ComplianceMapper:
frameworks = {
'CIS': 'CIS Benchmarks',
'SOC2': 'SOC 2 Trust Services Criteria',
'PCI-DSS': 'Payment Card Industry Data Security Standard',
'HIPAA': 'Health Insurance Portability and Accountability Act',
'GDPR': 'General Data Protection Regulation',
}
def map_finding(self, finding):
"""Map a security finding to applicable compliance frameworks."""
mapped_to = []
# Example mapping
if finding['issue'] == 'S3 bucket publicly accessible':
mapped_to.extend([
('CIS', '2.1'),
('SOC2', 'CC6.1'),
('PCI-DSS', '1.2.1'),
('HIPAA', '164.312(a)(1)'),
])
return mapped_to
Drift Detection
CSPM continuously monitors for configuration drift — when cloud resources deviate from the security baseline.
import hashlib
import json
class DriftDetector:
def __init__(self):
self.baseline = {}
def capture_baseline(self, resources):
"""Create a hash-based baseline of resource configurations."""
for resource in resources:
resource_id = resource['id']
config_hash = hashlib.sha256(
json.dumps(resource['config'], sort_keys=True).encode()
).hexdigest()
self.baseline[resource_id] = config_hash
def detect_drift(self, current_state):
"""Compare current state against baseline."""
drifts = []
for resource in current_state:
resource_id = resource['id']
current_hash = hashlib.sha256(
json.dumps(resource['config'], sort_keys=True).encode()
).hexdigest()
baseline_hash = self.baseline.get(resource_id)
if baseline_hash and baseline_hash != current_hash:
drifts.append({
'resource': resource_id,
'type': resource['type'],
'baseline': baseline_hash[:8],
'current': current_hash[:8],
'detected_at': datetime.utcnow().isoformat()
})
return drifts
Remediation Automation
CSPM tools can automatically remediate common misconfigurations.
class AutoRemediator:
def remediate_open_security_group(self, group_id, region):
ec2 = boto3.client('ec2', region_name=region)
# Get current rules
sg = ec2.describe_security_groups(GroupIds=[group_id])['SecurityGroups'][0]
for permission in sg['IpPermissions']:
for ip_range in permission.get('IpRanges', []):
if ip_range['CidrIp'] == '0.0.0.0/0':
if permission.get('FromPort') in (22, 3389, 3306, 5432):
# Remove overly permissive rule
ec2.revoke_security_group_ingress(
GroupId=group_id,
IpPermissions=[{
'IpProtocol': permission['IpProtocol'],
'FromPort': permission['FromPort'],
'ToPort': permission['ToPort'],
'IpRanges': [{'CidrIp': '0.0.0.0/0'}]
}]
)
return {'action': 'removed_rule', 'port': permission['FromPort']}
return {'action': 'none'}
CSPM Integration Pipeline
cspm_pipeline:
schedule: continuous
collectors:
- source: aws_config
interval: 15_minutes
- source: gcp_asset_inventory
interval: 15_minutes
- source: azure_policy
interval: 15_minutes
evaluation:
frameworks:
- cis_v1.5
- soc2
- pci_dss_v4.0
alerting:
critical: pagerduty_immediate
high: slack_15_minutes
medium: jira_daily
low: monthly_report
remediation:
auto_remediate:
- open_ssh_from_internet
- unencrypted_s3_bucket
- disabled_cloudtrail
approval_required:
- iam_policy_changes
- cross_account_access
Conclusion
CSPM is essential for maintaining secure cloud configurations at scale. Deploy CSPM tools that continuously scan across all cloud providers, map findings to compliance frameworks, detect configuration drift, and automate remediation where safe. Integrate CSPM findings into your existing SIEM and incident response workflows for complete visibility.