SGO/app.py
Eduardo Figueroa a6b2cea31f
Some checks failed
CI / syntax-check (push) Has been cancelled
CI / security-scan (push) Has been cancelled
CI / container-lint (push) Has been cancelled
CI / container-build (push) Has been cancelled
Publish / publish (push) Has been cancelled
Migrate to Podman, Forgejo Actions; clean up cruft
Container:
- Dockerfile → Containerfile; drop gosu, entrypoint, PUID/PGID user-switching
- HOME=/config so Path.home()/.aws resolves to runtime-mounted credentials
- docker-compose.yml → compose.yml with userns_mode: keep-id for Podman rootless
- .dockerignore → .containerignore
- boto3 unpinned from 1.34.0 to >=1.34.0

CI:
- Remove Woodpecker (.woodpecker.yml, .woodpecker/)
- Add Forgejo Actions (.forgejo/workflows/ci.yml, publish.yml)
- CI: syntax check, security scan, container lint (hadolint), build test
- Publish: build and push to Quay.io on main push and version tags

Cleanup:
- Remove entrypoint.sh (no longer needed)
- Remove scripts/build-and-push.sh and PUBLISHING.md (superseded by CI)
- All docker → podman command references updated

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-16 15:41:08 -07:00

1050 lines
38 KiB
Python
Executable file
Raw Permalink Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""
Flask web application for exploring AWS EC2 and Security Group exports
"""
from flask import Flask, render_template, request, jsonify, Response, stream_with_context
import sqlite3
import os
import re
import atexit
import signal
import sys
import boto3
import configparser
from pathlib import Path
import json
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
import threading
import queue
app = Flask(__name__)
DB_PATH = os.path.join(os.path.dirname(__file__), 'data', 'aws_export.db')
# Cache for AWS session credentials (valid for 1 hour)
session_cache = {} # {profile: {'credentials': {...}, 'region': ..., 'timestamp': ...}}
def regexp(pattern, value):
"""Custom REGEXP function for SQLite"""
if value is None:
return False
try:
return re.search(pattern, value, re.IGNORECASE) is not None
except re.error:
return False
def init_db():
"""Create database schema — called once at startup"""
os.makedirs(os.path.dirname(DB_PATH), exist_ok=True)
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
cursor.execute("""
CREATE TABLE IF NOT EXISTS security_groups (
id INTEGER PRIMARY KEY AUTOINCREMENT,
account_id TEXT,
account_name TEXT,
group_id TEXT UNIQUE,
group_name TEXT,
tag_name TEXT,
tag_wave TEXT,
tag_git_repo TEXT,
tag_git_org TEXT,
tag_git_file TEXT,
tags_json TEXT,
ingress_rule_count INTEGER,
egress_rule_count INTEGER
)
""")
cursor.execute("""
CREATE TABLE IF NOT EXISTS ec2_instances (
id INTEGER PRIMARY KEY AUTOINCREMENT,
account_id TEXT,
account_name TEXT,
tag_name TEXT,
instance_id TEXT UNIQUE,
state TEXT,
private_ip_address TEXT,
security_groups_id_list TEXT,
security_groups_name_list TEXT,
tag_git_repo TEXT,
tag_git_org TEXT,
tag_git_file TEXT,
tags_json TEXT
)
""")
cursor.execute("""
CREATE TABLE IF NOT EXISTS sg_rules (
id INTEGER PRIMARY KEY AUTOINCREMENT,
group_id TEXT,
direction TEXT,
protocol TEXT,
port_range TEXT,
source_type TEXT,
source TEXT,
description TEXT
)
""")
cursor.execute("""
CREATE TABLE IF NOT EXISTS refresh_timestamps (
id INTEGER PRIMARY KEY AUTOINCREMENT,
account_id TEXT,
account_name TEXT,
last_refresh TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
UNIQUE(account_id)
)
""")
conn.commit()
conn.close()
def get_db():
"""Get database connection"""
conn = sqlite3.connect(DB_PATH)
conn.row_factory = sqlite3.Row
conn.create_function("REGEXP", 2, regexp)
return conn
# Initialize schema at module load (works with both direct run and WSGI)
init_db()
@app.route('/')
def index():
"""Show explorer if DB has data, otherwise show import page"""
try:
conn = get_db()
count = conn.execute("SELECT COUNT(*) FROM security_groups").fetchone()[0]
conn.close()
if count > 0:
return render_template('index.html')
except Exception:
pass
return render_template('import.html')
@app.route('/explorer')
def explorer():
"""Main explorer interface"""
# Always show explorer, will display empty state if no data
return render_template('index.html')
@app.route('/api/profiles')
def get_profiles():
"""Get list of AWS profiles"""
try:
config_path = Path.home() / '.aws' / 'config'
if not config_path.exists():
return jsonify({'error': f'AWS config file not found at {config_path}'}), 404
config = configparser.ConfigParser()
config.read(config_path)
profiles = []
for section in config.sections():
profile_name = None
if section.startswith('profile '):
profile_name = section.replace('profile ', '')
elif section == 'default':
profile_name = 'default'
if profile_name:
# Check if profile has MFA configured
has_mfa = config.has_option(section, 'mfa_serial')
profiles.append({
'name': profile_name,
'has_mfa': has_mfa
})
# Sort profiles alphabetically, but keep 'default' at the top
profiles.sort(key=lambda x: ('0' if x['name'] == 'default' else '1' + x['name'].lower()))
return jsonify({'profiles': profiles})
except Exception as e:
return jsonify({'error': str(e)}), 500
def send_progress(message, status='info'):
"""Send progress update via Server-Sent Events"""
return f"data: {json.dumps({'message': message, 'status': status})}\n\n"
def get_account_info_inline(session):
"""Get AWS account ID and alias (inline version)"""
sts = session.client('sts')
identity = sts.get_caller_identity()
account_id = identity['Account']
try:
iam = session.client('iam')
aliases = iam.list_account_aliases()
account_name = aliases['AccountAliases'][0] if aliases['AccountAliases'] else account_id
except:
account_name = account_id
return account_id, account_name
def import_profile(profile, mfa_code, progress_queue):
"""Import data from a single AWS profile (runs in thread)"""
try:
from import_from_aws import fetch_security_groups, fetch_ec2_instances
progress_queue.put(('info', f"[{profile}] Starting authentication..."))
# Read AWS config to get MFA serial
config_path = Path.home() / '.aws' / 'config'
config = configparser.ConfigParser()
config.read(config_path)
section_name = f'profile {profile}' if profile != 'default' else 'default'
mfa_serial = None
region = None
source_profile = None
role_arn = None
duration_seconds = 3600 # Default to 1 hour
if section_name in config:
mfa_serial = config[section_name].get('mfa_serial')
region = config[section_name].get('region', 'us-east-1')
source_profile = config[section_name].get('source_profile')
role_arn = config[section_name].get('role_arn')
# Read duration_seconds from config, default to 3600 (1 hour)
if config.has_option(section_name, 'duration_seconds'):
try:
duration_seconds = int(config[section_name].get('duration_seconds'))
# Validate AWS session duration limits (15 min to 12 hours)
if duration_seconds < 900 or duration_seconds > 43200:
progress_queue.put(('warning', f"[{profile}] duration_seconds {duration_seconds} outside AWS limits (900-43200), using default 3600"))
duration_seconds = 3600
except ValueError:
progress_queue.put(('warning', f"[{profile}] Invalid duration_seconds in config, using default 3600"))
duration_seconds = 3600
# Debug output
progress_queue.put(('info', f"[{profile}] Config: region={region}, mfa_serial={bool(mfa_serial)}, source_profile={source_profile}, role_arn={role_arn}, duration={duration_seconds}s"))
# Read base credentials from ~/.aws/credentials
creds_path = Path.home() / '.aws' / 'credentials'
creds_config = configparser.ConfigParser()
creds_config.read(creds_path)
# Determine which credentials section to use
# Priority: source_profile > profile name > default
if source_profile and source_profile in creds_config:
cred_section = source_profile
elif profile in creds_config:
cred_section = profile
elif 'default' in creds_config:
cred_section = 'default'
else:
progress_queue.put(('error', f"✗ [{profile}] Credentials not found in ~/.aws/credentials"))
return None
if cred_section not in creds_config:
progress_queue.put(('error', f"✗ [{profile}] Credentials not found in ~/.aws/credentials"))
return None
base_access_key = creds_config[cred_section].get('aws_access_key_id')
base_secret_key = creds_config[cred_section].get('aws_secret_access_key')
if not base_access_key or not base_secret_key:
progress_queue.put(('error', f"✗ [{profile}] Invalid credentials in ~/.aws/credentials"))
return None
# If MFA is configured and we have a code, use it
if mfa_serial and mfa_code:
progress_queue.put(('info', f"[{profile}] Using MFA authentication..."))
# Create STS client with base credentials (no session)
sts = boto3.client(
'sts',
aws_access_key_id=base_access_key,
aws_secret_access_key=base_secret_key,
region_name=region or 'us-east-1'
)
try:
# Get temporary credentials with MFA
response = sts.get_session_token(
DurationSeconds=duration_seconds,
SerialNumber=mfa_serial,
TokenCode=mfa_code
)
credentials = response['Credentials']
progress_queue.put(('success', f"✓ [{profile}] MFA authentication successful"))
# If there's a role to assume, assume it
if role_arn:
progress_queue.put(('info', f"[{profile}] Assuming role {role_arn}..."))
# Create STS client with MFA session credentials
sts_with_mfa = boto3.client(
'sts',
aws_access_key_id=credentials['AccessKeyId'],
aws_secret_access_key=credentials['SecretAccessKey'],
aws_session_token=credentials['SessionToken'],
region_name=region or 'us-east-1'
)
try:
# Assume the role
role_response = sts_with_mfa.assume_role(
RoleArn=role_arn,
RoleSessionName=f"{profile}-session"
)
role_credentials = role_response['Credentials']
session = boto3.Session(
aws_access_key_id=role_credentials['AccessKeyId'],
aws_secret_access_key=role_credentials['SecretAccessKey'],
aws_session_token=role_credentials['SessionToken'],
region_name=region or 'us-east-1'
)
progress_queue.put(('success', f"✓ [{profile}] Role assumption successful"))
except Exception as role_error:
progress_queue.put(('error', f"✗ [{profile}] Role assumption failed - {str(role_error)}"))
return None
else:
# No role to assume, use MFA session directly
session = boto3.Session(
aws_access_key_id=credentials['AccessKeyId'],
aws_secret_access_key=credentials['SecretAccessKey'],
aws_session_token=credentials['SessionToken'],
region_name=region or 'us-east-1'
)
except Exception as mfa_error:
progress_queue.put(('error', f"✗ [{profile}] MFA authentication failed - {str(mfa_error)}"))
return None
else:
# No MFA configured or no code provided
if mfa_serial and not mfa_code:
progress_queue.put(('error', f"✗ [{profile}] MFA code required but not provided"))
return None
progress_queue.put(('info', f"[{profile}] Using direct authentication (no MFA)..."))
# If there's a role to assume (without MFA)
if role_arn:
progress_queue.put(('info', f"[{profile}] Assuming role {role_arn}..."))
sts = boto3.client(
'sts',
aws_access_key_id=base_access_key,
aws_secret_access_key=base_secret_key,
region_name=region or 'us-east-1'
)
try:
role_response = sts.assume_role(
RoleArn=role_arn,
RoleSessionName=f"{profile}-session"
)
role_credentials = role_response['Credentials']
session = boto3.Session(
aws_access_key_id=role_credentials['AccessKeyId'],
aws_secret_access_key=role_credentials['SecretAccessKey'],
aws_session_token=role_credentials['SessionToken'],
region_name=region or 'us-east-1'
)
progress_queue.put(('success', f"✓ [{profile}] Role assumption successful"))
except Exception as role_error:
progress_queue.put(('error', f"✗ [{profile}] Role assumption failed - {str(role_error)}"))
return None
else:
# No role, use base credentials directly
session = boto3.Session(
aws_access_key_id=base_access_key,
aws_secret_access_key=base_secret_key,
region_name=region or 'us-east-1'
)
# Verify it works
try:
sts = session.client('sts')
sts.get_caller_identity()
progress_queue.put(('success', f"✓ [{profile}] Authentication successful"))
except Exception as e:
progress_queue.put(('error', f"✗ [{profile}] Authentication failed - {str(e)}"))
return None
# Get account info
account_id, account_name = get_account_info_inline(session)
progress_queue.put(('info', f" [{profile}] Account: {account_name} ({account_id})"))
# Cache the session credentials for reuse
global session_cache
session_cache[profile] = {
'session': session,
'region': region,
'timestamp': time.time(),
'duration_seconds': duration_seconds,
'account_id': account_id,
'account_name': account_name
}
# Fetch data
progress_queue.put(('info', f" [{profile}] Fetching security groups..."))
security_groups, sg_rules = fetch_security_groups(session, account_id, account_name)
progress_queue.put(('success', f" ✓ [{profile}] Found {len(security_groups)} security groups with {len(sg_rules)} rules"))
progress_queue.put(('info', f" [{profile}] Fetching EC2 instances..."))
ec2_instances = fetch_ec2_instances(session, account_id, account_name)
progress_queue.put(('success', f" ✓ [{profile}] Found {len(ec2_instances)} EC2 instances"))
return {
'profile': profile,
'security_groups': security_groups,
'ec2_instances': ec2_instances,
'sg_rules': sg_rules
}
except Exception as e:
progress_queue.put(('error', f"✗ [{profile}] Error - {str(e)}"))
return None
@app.route('/api/import', methods=['POST'])
def import_data():
"""Import data from AWS with parallel execution and streaming progress"""
data = request.json
selected_profiles = data.get('profiles', [])
mfa_codes = data.get('mfa_codes', {})
def generate():
try:
from import_from_aws import import_to_database
yield send_progress(f"Starting parallel import from {len(selected_profiles)} profile(s)...", 'info')
# Create a queue for progress messages from threads
progress_queue = queue.Queue()
# Submit all profiles for parallel execution
with ThreadPoolExecutor(max_workers=len(selected_profiles)) as executor:
# Submit all import tasks
futures = {}
for profile in selected_profiles:
mfa_code = mfa_codes.get(profile, '')
future = executor.submit(import_profile, profile, mfa_code, progress_queue)
futures[future] = profile
# Process results as they complete and drain progress queue
all_security_groups = []
all_ec2_instances = []
all_sg_rules = []
completed = 0
while completed < len(selected_profiles):
# Check for progress messages
while not progress_queue.empty():
status, message = progress_queue.get()
yield send_progress(message, status)
# Check for completed futures
for future in as_completed(futures, timeout=0.1):
if future in futures:
result = future.result()
completed += 1
if result:
all_security_groups.extend(result['security_groups'])
all_ec2_instances.extend(result['ec2_instances'])
all_sg_rules.extend(result['sg_rules'])
del futures[future]
break
time.sleep(0.1) # Small delay to prevent busy waiting
# Drain any remaining progress messages
while not progress_queue.empty():
status, message = progress_queue.get()
yield send_progress(message, status)
# Import to database
if all_security_groups or all_ec2_instances:
yield send_progress("Importing to database...", 'info')
import_to_database(DB_PATH, all_security_groups, all_ec2_instances, all_sg_rules, append=False)
yield send_progress(f"✓ Import complete!", 'success')
yield send_progress(f" Total Security Groups: {len(all_security_groups)}", 'success')
yield send_progress(f" Total EC2 Instances: {len(all_ec2_instances)}", 'success')
yield send_progress(f" Total SG Rules: {len(all_sg_rules)}", 'success')
yield send_progress("Redirecting to explorer...", 'complete')
else:
yield send_progress("✗ No data imported", 'error')
except Exception as e:
yield send_progress(f"✗ Import failed: {str(e)}", 'error')
return Response(stream_with_context(generate()), mimetype='text/event-stream')
@app.route('/api/import-profile', methods=['POST'])
def import_single_profile():
"""Import data from a single AWS profile with streaming progress"""
data = request.json
profile = data.get('profile')
mfa_code = data.get('mfa_code', '')
def generate():
try:
from import_from_aws import import_to_database
yield send_progress(f"Starting import from {profile}...", 'info')
# Create a queue for progress messages
progress_queue = queue.Queue()
# Import the profile
result = import_profile(profile, mfa_code, progress_queue)
# Drain progress messages
while not progress_queue.empty():
status, message = progress_queue.get()
yield send_progress(message, status)
# Import to database
if result:
yield send_progress("Importing to database...", 'info')
import_to_database(
DB_PATH,
result['security_groups'],
result['ec2_instances'],
result['sg_rules'],
append=True # Append mode for individual imports
)
yield send_progress(f"✓ Import complete for {profile}!", 'success')
yield send_progress(f" Security Groups: {len(result['security_groups'])}", 'success')
yield send_progress(f" EC2 Instances: {len(result['ec2_instances'])}", 'success')
yield send_progress(f" SG Rules: {len(result['sg_rules'])}", 'success')
yield send_progress("Done", 'complete')
else:
yield send_progress(f"✗ Import failed for {profile}", 'error')
except Exception as e:
yield send_progress(f"✗ Import failed: {str(e)}", 'error')
return Response(stream_with_context(generate()), mimetype='text/event-stream')
@app.route('/api/refresh-cached', methods=['POST'])
def refresh_cached():
"""Refresh data using cached AWS sessions (if still valid)"""
global session_cache
if not session_cache:
return jsonify({'error': 'No cached sessions', 'redirect': True})
def generate():
try:
from import_from_aws import fetch_security_groups, fetch_ec2_instances, import_to_database
# Check if cached sessions are still valid
current_time = time.time()
valid_profiles = []
for profile, cache_data in session_cache.items():
age_seconds = current_time - cache_data['timestamp']
duration_seconds = cache_data.get('duration_seconds', 3600)
# Use 5 minutes (300 seconds) safety margin
max_age_seconds = duration_seconds - 300
if age_seconds < max_age_seconds:
valid_profiles.append(profile)
else:
age_minutes = age_seconds / 60
yield send_progress(f"[{profile}] Session expired ({age_minutes:.1f} min old)", 'error')
if not valid_profiles:
yield send_progress("All sessions expired. Please re-authenticate.", 'error')
yield send_progress("REDIRECT", 'redirect')
return
yield send_progress(f"Refreshing data from {len(valid_profiles)} cached session(s)...", 'info')
all_security_groups = []
all_ec2_instances = []
all_sg_rules = []
for profile in valid_profiles:
cache_data = session_cache[profile]
session = cache_data['session']
account_id = cache_data['account_id']
account_name = cache_data['account_name']
try:
yield send_progress(f"[{profile}] Fetching security groups...", 'info')
security_groups, sg_rules = fetch_security_groups(session, account_id, account_name)
yield send_progress(f"✓ [{profile}] Found {len(security_groups)} security groups", 'success')
yield send_progress(f"[{profile}] Fetching EC2 instances...", 'info')
ec2_instances = fetch_ec2_instances(session, account_id, account_name)
yield send_progress(f"✓ [{profile}] Found {len(ec2_instances)} EC2 instances", 'success')
all_security_groups.extend(security_groups)
all_ec2_instances.extend(ec2_instances)
all_sg_rules.extend(sg_rules)
except Exception as e:
error_msg = str(e)
if 'ExpiredToken' in error_msg or 'InvalidToken' in error_msg:
yield send_progress(f"✗ [{profile}] Session expired", 'error')
yield send_progress("REDIRECT", 'redirect')
return
else:
yield send_progress(f"✗ [{profile}] Error: {error_msg}", 'error')
# Import to database
if all_security_groups or all_ec2_instances:
yield send_progress("Updating database...", 'info')
import_to_database(DB_PATH, all_security_groups, all_ec2_instances, all_sg_rules, append=False)
yield send_progress(f"✓ Refresh complete!", 'success')
yield send_progress(f" Total Security Groups: {len(all_security_groups)}", 'success')
yield send_progress(f" Total EC2 Instances: {len(all_ec2_instances)}", 'success')
yield send_progress("COMPLETE", 'complete')
else:
yield send_progress("✗ No data refreshed", 'error')
except Exception as e:
yield send_progress(f"✗ Refresh failed: {str(e)}", 'error')
return Response(stream_with_context(generate()), mimetype='text/event-stream')
@app.route('/api/clear-db', methods=['POST'])
def clear_db():
"""Clear all data from the database"""
try:
conn = get_db()
cursor = conn.cursor()
cursor.execute("DELETE FROM security_groups")
cursor.execute("DELETE FROM ec2_instances")
cursor.execute("DELETE FROM sg_rules")
cursor.execute("DELETE FROM refresh_timestamps")
conn.commit()
conn.close()
return jsonify({'success': True})
except Exception as e:
return jsonify({'error': str(e)}), 500
@app.route('/api/tags')
def get_tags():
"""Get all available tag values for filtering"""
conn = get_db()
# Get distinct tag_wave values
waves = conn.execute("""
SELECT DISTINCT tag_wave FROM security_groups
WHERE tag_wave IS NOT NULL AND tag_wave != ''
ORDER BY tag_wave
""").fetchall()
# Get distinct tag_git_repo values from both tables
repos = conn.execute("""
SELECT DISTINCT tag_git_repo FROM security_groups
WHERE tag_git_repo IS NOT NULL AND tag_git_repo != ''
UNION
SELECT DISTINCT tag_git_repo FROM ec2_instances
WHERE tag_git_repo IS NOT NULL AND tag_git_repo != ''
ORDER BY tag_git_repo
""").fetchall()
conn.close()
return jsonify({
'waves': [w['tag_wave'] for w in waves],
'repos': [r['tag_git_repo'] for r in repos]
})
@app.route('/api/search')
def search():
"""Search for EC2 instances or security groups"""
query = request.args.get('q', '').strip()
search_type = request.args.get('type', 'all')
use_regex = request.args.get('regex', 'false').lower() == 'true'
filter_wave = request.args.get('wave', '').strip()
filter_repo = request.args.get('repo', '').strip()
conn = get_db()
results = []
try:
if search_type in ['all', 'sg']:
# Build WHERE clause with tag filters
where_clauses = []
params = []
if query:
if use_regex:
try:
re.compile(query)
except re.error as e:
conn.close()
return jsonify({'error': f'Invalid regex pattern: {str(e)}', 'results': []})
where_clauses.append("(group_id REGEXP ? OR group_name REGEXP ? OR tag_name REGEXP ?)")
params.extend([query, query, query])
else:
where_clauses.append("(group_id LIKE ? OR group_name LIKE ? OR tag_name LIKE ?)")
params.extend([f'%{query}%', f'%{query}%', f'%{query}%'])
if filter_wave:
where_clauses.append("tag_wave = ?")
params.append(filter_wave)
if filter_repo:
where_clauses.append("tag_git_repo = ?")
params.append(filter_repo)
where_sql = " AND ".join(where_clauses) if where_clauses else "1=1"
sg_results = conn.execute(f"""
SELECT 'sg' as type, group_id as id, group_name as name, tag_name,
account_name, account_id, tag_wave, tag_git_repo, tag_git_org, tag_git_file,
ingress_rule_count
FROM security_groups
WHERE {where_sql}
ORDER BY tag_name, group_name
LIMIT 500
""", params).fetchall()
for row in sg_results:
results.append(dict(row))
if search_type in ['all', 'ec2']:
# Build WHERE clause with tag filters
where_clauses = []
params = []
if query:
if use_regex:
where_clauses.append("(instance_id REGEXP ? OR tag_name REGEXP ? OR private_ip_address REGEXP ?)")
params.extend([query, query, query])
else:
where_clauses.append("(instance_id LIKE ? OR tag_name LIKE ? OR private_ip_address LIKE ?)")
params.extend([f'%{query}%', f'%{query}%', f'%{query}%'])
if filter_repo:
where_clauses.append("tag_git_repo = ?")
params.append(filter_repo)
where_sql = " AND ".join(where_clauses) if where_clauses else "1=1"
ec2_results = conn.execute(f"""
SELECT 'ec2' as type, instance_id as id, tag_name as name, tag_name,
account_name, account_id, state, private_ip_address,
security_groups_id_list, security_groups_name_list, tag_git_repo,
tag_git_org, tag_git_file
FROM ec2_instances
WHERE {where_sql}
ORDER BY tag_name
LIMIT 500
""", params).fetchall()
for row in ec2_results:
results.append(dict(row))
except Exception as e:
conn.close()
return jsonify({'error': f'Search error: {str(e)}', 'results': []})
conn.close()
return jsonify({'results': results})
@app.route('/api/ec2/<instance_id>')
def get_ec2_details(instance_id):
"""Get detailed information about an EC2 instance and its security groups"""
conn = get_db()
ec2 = conn.execute("""
SELECT * FROM ec2_instances WHERE instance_id = ?
""", (instance_id,)).fetchone()
if not ec2:
conn.close()
return jsonify({'error': 'EC2 instance not found'}), 404
ec2_dict = dict(ec2)
sg_ids = ec2_dict['security_groups_id_list'].split(';') if ec2_dict['security_groups_id_list'] else []
security_groups = []
for sg_id in sg_ids:
if sg_id:
sg = conn.execute("""
SELECT * FROM security_groups WHERE group_id = ?
""", (sg_id,)).fetchone()
if sg:
security_groups.append(dict(sg))
conn.close()
return jsonify({
'ec2': ec2_dict,
'security_groups': security_groups
})
@app.route('/api/sg/<group_id>')
def get_sg_details(group_id):
"""Get detailed information about a security group and attached EC2 instances"""
conn = get_db()
sg = conn.execute("""
SELECT * FROM security_groups WHERE group_id = ?
""", (group_id,)).fetchone()
if not sg:
conn.close()
return jsonify({'error': 'Security group not found'}), 404
sg_dict = dict(sg)
ec2_instances = conn.execute("""
SELECT * FROM ec2_instances
WHERE security_groups_id_list LIKE ?
""", (f'%{group_id}%',)).fetchall()
ec2_list = [dict(row) for row in ec2_instances]
conn.close()
return jsonify({
'security_group': sg_dict,
'ec2_instances': ec2_list
})
@app.route('/api/sg/<group_id>/rules')
def get_sg_rules(group_id):
"""Get all rules for a security group"""
conn = get_db()
ingress_rules = conn.execute("""
SELECT * FROM sg_rules
WHERE group_id = ? AND direction = 'ingress'
ORDER BY protocol, port_range, source
""", (group_id,)).fetchall()
egress_rules = conn.execute("""
SELECT * FROM sg_rules
WHERE group_id = ? AND direction = 'egress'
ORDER BY protocol, port_range, source
""", (group_id,)).fetchall()
conn.close()
return jsonify({
'ingress': [dict(row) for row in ingress_rules],
'egress': [dict(row) for row in egress_rules]
})
@app.route('/api/stats')
def get_stats():
"""Get database statistics"""
conn = get_db()
sg_count = conn.execute("SELECT COUNT(*) as count FROM security_groups").fetchone()['count']
ec2_count = conn.execute("SELECT COUNT(*) as count FROM ec2_instances").fetchone()['count']
accounts = conn.execute("""
SELECT DISTINCT account_name FROM security_groups
UNION
SELECT DISTINCT account_name FROM ec2_instances
ORDER BY account_name
""").fetchall()
# Get refresh timestamps
refresh_times = conn.execute("""
SELECT account_name, last_refresh
FROM refresh_timestamps
ORDER BY last_refresh DESC
""").fetchall()
conn.close()
return jsonify({
'security_groups': sg_count,
'ec2_instances': ec2_count,
'accounts': [a['account_name'] for a in accounts],
'refresh_timestamps': [{'account': r['account_name'], 'timestamp': r['last_refresh']} for r in refresh_times]
})
@app.route('/api/session-expiration')
def get_session_expiration():
"""Get session expiration info for credential countdown"""
global session_cache
if not session_cache:
return jsonify({'has_session': False})
current_time = time.time()
earliest_expiration = None
for profile, cache_data in session_cache.items():
timestamp = cache_data['timestamp']
duration_seconds = cache_data.get('duration_seconds', 3600)
expiration_time = timestamp + duration_seconds
if earliest_expiration is None or expiration_time < earliest_expiration:
earliest_expiration = expiration_time
if earliest_expiration is None:
return jsonify({'has_session': False})
seconds_remaining = int(earliest_expiration - current_time)
return jsonify({
'has_session': True,
'seconds_remaining': seconds_remaining,
'expiration_timestamp': earliest_expiration
})
@app.route('/api/search-ip')
def search_ip():
"""Search for an IP address across all firewall rules and EC2 instances with optional text, port, and type filters"""
ip_query = request.args.get('ip', '').strip()
text_filter = request.args.get('text', '').strip()
port_filter = request.args.get('port', '').strip()
resource_type = request.args.get('type', 'all').strip() # all, ec2, sg
if not ip_query:
return jsonify({'error': 'IP address required', 'results': {'sg_rules': [], 'ec2_instances': []}, 'count': 0})
conn = get_db()
sg_results = []
ec2_results = []
try:
# Search for IP in security group rules (if type is 'all' or 'sg')
if resource_type in ['all', 'sg']:
where_clauses = ["r.source LIKE ?"]
params = [f'%{ip_query}%']
# Add text filter if provided
if text_filter:
where_clauses.append("(sg.tag_name LIKE ? OR sg.group_name LIKE ?)")
params.extend([f'%{text_filter}%', f'%{text_filter}%'])
# Add port filter if provided (searches in port_range field)
if port_filter:
where_clauses.append("(r.port_range LIKE ? OR r.port_range = 'All')")
params.append(f'%{port_filter}%')
where_sql = " AND ".join(where_clauses)
rules = conn.execute(f"""
SELECT
r.id,
r.group_id,
r.direction,
r.protocol,
r.port_range,
r.source_type,
r.source,
r.description,
sg.group_name,
sg.tag_name,
sg.account_name,
sg.account_id,
sg.tag_wave,
sg.tag_git_repo,
sg.tag_git_org,
sg.tag_git_file
FROM sg_rules r
JOIN security_groups sg ON r.group_id = sg.group_id
WHERE {where_sql}
ORDER BY sg.account_name, sg.group_name, r.direction, r.protocol
LIMIT 1000
""", params).fetchall()
for row in rules:
sg_results.append(dict(row))
# Search for IP in EC2 instances (if type is 'all' or 'ec2')
if resource_type in ['all', 'ec2']:
where_clauses = ["private_ip_address LIKE ?"]
params = [f'%{ip_query}%']
# Add text filter if provided
if text_filter:
where_clauses.append("(tag_name LIKE ? OR instance_id LIKE ?)")
params.extend([f'%{text_filter}%', f'%{text_filter}%'])
where_sql = " AND ".join(where_clauses)
instances = conn.execute(f"""
SELECT
instance_id,
tag_name,
state,
private_ip_address,
account_name,
account_id,
security_groups_id_list,
security_groups_name_list,
tag_git_repo,
tag_git_org,
tag_git_file
FROM ec2_instances
WHERE {where_sql}
ORDER BY account_name, tag_name
LIMIT 500
""", params).fetchall()
for row in instances:
ec2_results.append(dict(row))
except Exception as e:
conn.close()
return jsonify({'error': f'Search error: {str(e)}', 'results': {'sg_rules': [], 'ec2_instances': []}, 'count': 0})
conn.close()
total_count = len(sg_results) + len(ec2_results)
return jsonify({
'results': {
'sg_rules': sg_results,
'ec2_instances': ec2_results
},
'count': total_count
})
if __name__ == '__main__':
# Get debug mode from environment variable
debug_mode = os.getenv('DEBUG', 'false').lower() in ('true', '1', 'yes')
print("\n" + "="*60)
print("🔭 SGO: Security Groups (and Instances) Observatory")
print("="*60)
print(f"\n Database location: {DB_PATH}")
print(" Database is persistent - data will be preserved between runs")
print(" Access the application at: http://localhost:5000")
print(f" Debug mode: {'enabled' if debug_mode else 'disabled'}")
print("\n" + "="*60 + "\n")
app.run(host='0.0.0.0', port=5000, debug=debug_mode)