AWS S3: Upload/Download
boto3 examples to upload and download; includes .bat launcher
What this is
Simple S3 tools: one script downloads a file, another uploads files and creates a manifest. A .bat launches the uploader. Uses .env for credentials and settings.
pip install boto3 python-dotenv
.env variables (typical)
AWS_ACCESS_KEY_ID=...
AWS_SECRET_ACCESS_KEY=...
AWS_REGION=us-east-1
AWS_BUCKET_NAME=your-bucket
# For downloader:
USER_UPLOAD=path/in/bucket/filename.ext
DIR_DOWNLOAD=C:\Downloads\S3
Download script (download_from_aws_s3_bucket.py)
import boto3
import os
from pathlib import Path
from dotenv import load_dotenv
import logging
from dotenv import load_dotenv
from pathlib import Path
def load_env_fallback(base_path=None, max_depth=3):
"""
Load environment variables from .env file, starting from script folder and going up.
Looks in current directory, then parent, then grandparent.
"""
if base_path is None:
base_path = Path(__file__).resolve().parent # Folder where this script lives
current = base_path
for _ in range(max_depth):
env_file = current / ".env"
if env_file.exists():
print(f"🔧 Loading .env from: {env_file}")
load_dotenv(dotenv_path=env_file, override=True)
break
current = current.parent
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('s3_download.log'),
logging.StreamHandler()
]
)
logger = logging.getLogger(__name__)
def load_environment_variables():
"""Load and validate required environment variables"""
load_dotenv()
required_vars = [
'AWS_ACCESS_KEY_ID',
'AWS_SECRET_ACCESS_KEY',
'AWS_BUCKET_NAME',
'AWS_REGION',
'USER_UPLOAD',
'DIR_DOWNLOAD' # Added DIR_DOWNLOAD to required vars
]
missing_vars = [var for var in required_vars if not os.getenv(var)]
if missing_vars:
raise ValueError(f"Missing required environment variables: {', '.join(missing_vars)}")
return {var: os.getenv(var) for var in required_vars}
def download_from_s3(env_vars):
"""Download file from S3 bucket using provided credentials"""
try:
# Initialize S3 client
s3_client = boto3.client(
's3',
aws_access_key_id=env_vars['AWS_ACCESS_KEY_ID'],
aws_secret_access_key=env_vars['AWS_SECRET_ACCESS_KEY'],
region_name=env_vars['AWS_REGION']
)
# Prepare file paths using DIR_DOWNLOAD
source_path = env_vars['USER_UPLOAD']
download_dir = Path(env_vars['DIR_DOWNLOAD'])
target_path = download_dir / Path(source_path).name
# Create download directory if it doesn't exist
download_dir.mkdir(parents=True, exist_ok=True)
logger.info(f"Downloading {source_path} from bucket {env_vars['AWS_BUCKET_NAME']}")
logger.info(f"Download directory: {download_dir}")
# Download the file
s3_client.download_file(
env_vars['AWS_BUCKET_NAME'],
source_path,
str(target_path)
)
logger.info(f"Successfully downloaded file to: {target_path.absolute()}")
return target_path
except Exception as e:
logger.error(f"Error downloading file: {e}")
raise
def main():
load_env_fallback()
try:
logger.info("Starting S3 download process")
# Load environment variables
env_vars = load_environment_variables()
logger.info(f"Attempting to download: {env_vars['USER_UPLOAD']}")
# Download file
downloaded_file = download_from_s3(env_vars)
logger.info("=" * 50)
logger.info("Download completed successfully")
logger.info(f"File location: {downloaded_file}")
logger.info("=" * 50)
except Exception as e:
logger.error(f"Download failed: {e}")
sys.exit(1)
if __name__ == "__main__":
main()
Upload script (upload_to_aws_s3_bucket.py)
import os
import json
import argparse
from pathlib import Path
from datetime import datetime
import boto3
from dotenv import load_dotenv
import uuid
def load_env_near_script():
script_path = Path(__file__).resolve()
candidates = [
script_path.parent / ".env",
script_path.parent.parent / ".env",
script_path.parent.parent.parent / ".env"
]
for env_path in candidates:
if env_path.exists():
load_dotenv(dotenv_path=env_path)
print(f"Loaded environment variables from: {env_path}")
return
print("No .env file found in script dir, parent, or grandparent.")
def upload_files_to_s3(aws_access_key_id, aws_secret_access_key, region, bucket, input_files, output_prefix=None, job_id=None):
s3_client = boto3.client(
"s3",
region_name=region,
aws_access_key_id=aws_access_key_id,
aws_secret_access_key=aws_secret_access_key
)
uploaded_files = []
# Extract the job ID from first parameter filename if not provided
if not job_id and input_files:
first_file = Path(input_files[0]).name
# Try to find a UUID pattern in the filename
import re
uuid_pattern = re.compile(r'([0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12})', re.IGNORECASE)
match = uuid_pattern.search(first_file)
if match:
job_id = match.group(1)
print(f"Extracted job ID from filename: {job_id}")
else:
# Generate a new ID if none found
job_id = str(uuid.uuid4())
print(f"Generated new job ID: {job_id}")
# Upload all media files first WITHOUT triggering webhooks
for file_path in input_files:
file_path = Path(file_path)
if not file_path.is_file():
continue
key = f"{output_prefix}/{file_path.name}" if output_prefix else file_path.name
print(f"Uploading {file_path} to s3://{bucket}/{key}")
s3_client.upload_file(str(file_path), bucket, key)
uploaded_files.append({
"key": key,
"original_name": file_path.name,
"size": file_path.stat().st_size,
"uploaded_at": datetime.utcnow().isoformat() + "Z"
})
# Create and upload the s3event.json file
if uploaded_files:
s3event_data = {
"jobId": job_id,
"bucket_name": bucket,
"upload_time": datetime.utcnow().isoformat() + "Z",
"files": uploaded_files
}
# Create temporary file for s3event.json
temp_dir = Path("./temp")
temp_dir.mkdir(exist_ok=True)
s3event_path = temp_dir / f"{job_id}_s3event.json"
with open(s3event_path, "w") as f:
json.dump(s3event_data, f, indent=2)
# Upload s3event.json to the same bucket/prefix as the other files
s3event_key = f"{output_prefix}/{job_id}_s3event.json" if output_prefix else f"{job_id}_s3event.json"
print(f"Uploading s3event.json to s3://{bucket}/{s3event_key}")
s3_client.upload_file(str(s3event_path), bucket, s3event_key)
# Clean up temp file
s3event_path.unlink()
# Add s3event to uploaded_files list
uploaded_files.append({
"key": s3event_key,
"original_name": f"{job_id}_s3event.json",
"size": os.path.getsize(str(s3event_path)) if s3event_path.exists() else 0,
"uploaded_at": datetime.utcnow().isoformat() + "Z",
"is_s3event": True
})
return uploaded_files
def save_manifest(manifest_path, uploaded_files):
with open(manifest_path, "w") as f:
json.dump(uploaded_files, f, indent=4)
print(f"Manifest saved to {manifest_path}")
def main():
load_env_near_script()
parser = argparse.ArgumentParser(description="Upload files to AWS S3 with automatic s3event generation.")
parser.add_argument("--aws_access_key_id", default=os.getenv("AWS_ACCESS_KEY_ID"), help="AWS Access Key ID")
parser.add_argument("--aws_secret_access_key", default=os.getenv("AWS_SECRET_ACCESS_KEY"), help="AWS Secret Access Key")
parser.add_argument("--region", default=os.getenv("AWS_REGION", "us-east-1"), help="AWS Region")
parser.add_argument("--bucket", default=os.getenv("AWS_BUCKET_NAME"), required=True, help="S3 Bucket Name")
parser.add_argument("--job_id", type=str, default=None, help="Job ID for s3event.json (automatically generated if not provided)")
parser.add_argument("--input", nargs="+", required=True, help="Files to upload")
parser.add_argument("--output", default="", help="Target folder in bucket (optional)")
parser.add_argument("--filelist", help="Path to output manifest file (e.g., JOB_ID_s3event.json)")
args = parser.parse_args()
uploaded_files = upload_files_to_s3(
aws_access_key_id=args.aws_access_key_id,
aws_secret_access_key=args.aws_secret_access_key,
region=args.region,
bucket=args.bucket,
input_files=args.input,
output_prefix=args.output.strip("/"),
job_id=args.job_id
)
if args.filelist:
save_manifest(args.filelist, uploaded_files)
if __name__ == "__main__":
main()
Launcher (.bat) for upload (upload_to_aws_s3_bucket.bat)
@echo off
setlocal enabledelayedexpansion
REM Check for /NOPAUSE parameter
set NOPAUSE=0
if "%1"=="/NOPAUSE" set NOPAUSE=1
REM Get current directory and base filename without extension
set "CURRENT_DIR=%~dp0"
set "BASE_NAME=%~n0"
echo Running %BASE_NAME%.py...
REM Run Python script with unbuffered output
python -u "%CURRENT_DIR%%BASE_NAME%.py"
if errorlevel 1 goto error
:success
exit /b 0
:error
echo Error: Failed to run %BASE_NAME%.py
if %NOPAUSE%==0 pause
exit /b 1