AWS S3: Upload/Download

boto3 examples to upload and download; includes .bat launcher

What this is

Simple S3 tools: one script downloads a file, another uploads files and creates a manifest. A .bat launches the uploader. Uses .env for credentials and settings.

pip install boto3 python-dotenv

.env variables (typical)

AWS_ACCESS_KEY_ID=...
AWS_SECRET_ACCESS_KEY=...
AWS_REGION=us-east-1
AWS_BUCKET_NAME=your-bucket
# For downloader:
USER_UPLOAD=path/in/bucket/filename.ext
DIR_DOWNLOAD=C:\Downloads\S3

Download script (download_from_aws_s3_bucket.py)

import boto3
import os
from pathlib import Path
from dotenv import load_dotenv
import logging

from dotenv import load_dotenv
from pathlib import Path

def load_env_fallback(base_path=None, max_depth=3):
    """
    Load environment variables from .env file, starting from script folder and going up.
    Looks in current directory, then parent, then grandparent.
    """
    if base_path is None:
        base_path = Path(__file__).resolve().parent  # Folder where this script lives
    
    current = base_path
    for _ in range(max_depth):
        env_file = current / ".env"
        if env_file.exists():
            print(f"🔧 Loading .env from: {env_file}")
            load_dotenv(dotenv_path=env_file, override=True)
            break
        current = current.parent


# Configure logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s',
    handlers=[
        logging.FileHandler('s3_download.log'),
        logging.StreamHandler()
    ]
)
logger = logging.getLogger(__name__)

def load_environment_variables():
    """Load and validate required environment variables"""
    load_dotenv()
    
    required_vars = [
        'AWS_ACCESS_KEY_ID',
        'AWS_SECRET_ACCESS_KEY',
        'AWS_BUCKET_NAME',
        'AWS_REGION',
        'USER_UPLOAD',
        'DIR_DOWNLOAD'  # Added DIR_DOWNLOAD to required vars
    ]
    
    missing_vars = [var for var in required_vars if not os.getenv(var)]
    if missing_vars:
        raise ValueError(f"Missing required environment variables: {', '.join(missing_vars)}")
    
    return {var: os.getenv(var) for var in required_vars}

def download_from_s3(env_vars):
    """Download file from S3 bucket using provided credentials"""
    try:
        # Initialize S3 client
        s3_client = boto3.client(
            's3',
            aws_access_key_id=env_vars['AWS_ACCESS_KEY_ID'],
            aws_secret_access_key=env_vars['AWS_SECRET_ACCESS_KEY'],
            region_name=env_vars['AWS_REGION']
        )
        
        # Prepare file paths using DIR_DOWNLOAD
        source_path = env_vars['USER_UPLOAD']
        download_dir = Path(env_vars['DIR_DOWNLOAD'])
        target_path = download_dir / Path(source_path).name
        
        # Create download directory if it doesn't exist
        download_dir.mkdir(parents=True, exist_ok=True)
        
        logger.info(f"Downloading {source_path} from bucket {env_vars['AWS_BUCKET_NAME']}")
        logger.info(f"Download directory: {download_dir}")
        
        # Download the file
        s3_client.download_file(
            env_vars['AWS_BUCKET_NAME'],
            source_path,
            str(target_path)
        )
        
        logger.info(f"Successfully downloaded file to: {target_path.absolute()}")
        return target_path
        
    except Exception as e:
        logger.error(f"Error downloading file: {e}")
        raise

def main():
    load_env_fallback()
    try:
        logger.info("Starting S3 download process")
        
        # Load environment variables
        env_vars = load_environment_variables()
        logger.info(f"Attempting to download: {env_vars['USER_UPLOAD']}")
        
        # Download file
        downloaded_file = download_from_s3(env_vars)
        
        logger.info("=" * 50)
        logger.info("Download completed successfully")
        logger.info(f"File location: {downloaded_file}")
        logger.info("=" * 50)
        
    except Exception as e:
        logger.error(f"Download failed: {e}")
        sys.exit(1)

if __name__ == "__main__":
    main()

Upload script (upload_to_aws_s3_bucket.py)

import os
import json
import argparse
from pathlib import Path
from datetime import datetime
import boto3
from dotenv import load_dotenv
import uuid

def load_env_near_script():
    script_path = Path(__file__).resolve()
    candidates = [
        script_path.parent / ".env",
        script_path.parent.parent / ".env",
        script_path.parent.parent.parent / ".env"
    ]
    
    for env_path in candidates:
        if env_path.exists():
            load_dotenv(dotenv_path=env_path)
            print(f"Loaded environment variables from: {env_path}")
            return
    print("No .env file found in script dir, parent, or grandparent.")

def upload_files_to_s3(aws_access_key_id, aws_secret_access_key, region, bucket, input_files, output_prefix=None, job_id=None):
    s3_client = boto3.client(
        "s3",
        region_name=region,
        aws_access_key_id=aws_access_key_id,
        aws_secret_access_key=aws_secret_access_key
    )

    uploaded_files = []

    # Extract the job ID from first parameter filename if not provided
    if not job_id and input_files:
        first_file = Path(input_files[0]).name
        # Try to find a UUID pattern in the filename
        import re
        uuid_pattern = re.compile(r'([0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12})', re.IGNORECASE)
        match = uuid_pattern.search(first_file)
        if match:
            job_id = match.group(1)
            print(f"Extracted job ID from filename: {job_id}")
        else:
            # Generate a new ID if none found
            job_id = str(uuid.uuid4())
            print(f"Generated new job ID: {job_id}")

    # Upload all media files first WITHOUT triggering webhooks
    for file_path in input_files:
        file_path = Path(file_path)
        if not file_path.is_file():
            continue

        key = f"{output_prefix}/{file_path.name}" if output_prefix else file_path.name
        print(f"Uploading {file_path} to s3://{bucket}/{key}")
        s3_client.upload_file(str(file_path), bucket, key)

        uploaded_files.append({
            "key": key,
            "original_name": file_path.name,
            "size": file_path.stat().st_size,
            "uploaded_at": datetime.utcnow().isoformat() + "Z"
        })

    # Create and upload the s3event.json file
    if uploaded_files:
        s3event_data = {
            "jobId": job_id,
            "bucket_name": bucket,
            "upload_time": datetime.utcnow().isoformat() + "Z",
            "files": uploaded_files
        }
        
        # Create temporary file for s3event.json
        temp_dir = Path("./temp")
        temp_dir.mkdir(exist_ok=True)
        
        s3event_path = temp_dir / f"{job_id}_s3event.json"
        with open(s3event_path, "w") as f:
            json.dump(s3event_data, f, indent=2)
            
        # Upload s3event.json to the same bucket/prefix as the other files
        s3event_key = f"{output_prefix}/{job_id}_s3event.json" if output_prefix else f"{job_id}_s3event.json"
        print(f"Uploading s3event.json to s3://{bucket}/{s3event_key}")
        s3_client.upload_file(str(s3event_path), bucket, s3event_key)
        
        # Clean up temp file
        s3event_path.unlink()
        
        # Add s3event to uploaded_files list
        uploaded_files.append({
            "key": s3event_key,
            "original_name": f"{job_id}_s3event.json",
            "size": os.path.getsize(str(s3event_path)) if s3event_path.exists() else 0,
            "uploaded_at": datetime.utcnow().isoformat() + "Z",
            "is_s3event": True
        })

    return uploaded_files

def save_manifest(manifest_path, uploaded_files):
    with open(manifest_path, "w") as f:
        json.dump(uploaded_files, f, indent=4)
    print(f"Manifest saved to {manifest_path}")

def main():
    load_env_near_script()

    parser = argparse.ArgumentParser(description="Upload files to AWS S3 with automatic s3event generation.")
    
    parser.add_argument("--aws_access_key_id", default=os.getenv("AWS_ACCESS_KEY_ID"), help="AWS Access Key ID")
    parser.add_argument("--aws_secret_access_key", default=os.getenv("AWS_SECRET_ACCESS_KEY"), help="AWS Secret Access Key")
    parser.add_argument("--region", default=os.getenv("AWS_REGION", "us-east-1"), help="AWS Region")
    parser.add_argument("--bucket", default=os.getenv("AWS_BUCKET_NAME"), required=True, help="S3 Bucket Name")
    parser.add_argument("--job_id", type=str, default=None, help="Job ID for s3event.json (automatically generated if not provided)")
    parser.add_argument("--input", nargs="+", required=True, help="Files to upload")
    parser.add_argument("--output", default="", help="Target folder in bucket (optional)")
    parser.add_argument("--filelist", help="Path to output manifest file (e.g., JOB_ID_s3event.json)")

    args = parser.parse_args()

    uploaded_files = upload_files_to_s3(
        aws_access_key_id=args.aws_access_key_id,
        aws_secret_access_key=args.aws_secret_access_key,
        region=args.region,
        bucket=args.bucket,
        input_files=args.input,
        output_prefix=args.output.strip("/"),
        job_id=args.job_id
    )

    if args.filelist:
        save_manifest(args.filelist, uploaded_files)

if __name__ == "__main__":
    main()

Launcher (.bat) for upload (upload_to_aws_s3_bucket.bat)

@echo off
setlocal enabledelayedexpansion

REM Check for /NOPAUSE parameter
set NOPAUSE=0
if "%1"=="/NOPAUSE" set NOPAUSE=1

REM Get current directory and base filename without extension
set "CURRENT_DIR=%~dp0"
set "BASE_NAME=%~n0"

echo Running %BASE_NAME%.py...

REM Run Python script with unbuffered output
python -u "%CURRENT_DIR%%BASE_NAME%.py"
if errorlevel 1 goto error

:success
exit /b 0

:error
echo Error: Failed to run %BASE_NAME%.py
if %NOPAUSE%==0 pause
exit /b 1