Updated logging format for timestamps to be compatible with AWS. Added load_dynamo_logs.py example file.
94e514b
import boto3 | |
import csv | |
from decimal import Decimal | |
from boto3.dynamodb.conditions import Key | |
from tools.config import AWS_REGION, ACCESS_LOG_DYNAMODB_TABLE_NAME, FEEDBACK_LOG_DYNAMODB_TABLE_NAME, USAGE_LOG_DYNAMODB_TABLE_NAME, OUTPUT_FOLDER | |
# Replace with your actual table name and region | |
TABLE_NAME = USAGE_LOG_DYNAMODB_TABLE_NAME # Choose as appropriate | |
REGION = AWS_REGION | |
CSV_OUTPUT = OUTPUT_FOLDER + 'dynamodb_logs_export.csv' | |
# Create DynamoDB resource | |
dynamodb = boto3.resource('dynamodb', region_name=REGION) | |
table = dynamodb.Table(TABLE_NAME) | |
# Helper function to convert Decimal to float or int | |
def convert_types(item): | |
for key, value in item.items(): | |
if isinstance(value, Decimal): | |
# Convert to int if no decimal places, else float | |
item[key] = int(value) if value % 1 == 0 else float(value) | |
return item | |
# Paginated scan | |
def scan_table(): | |
items = [] | |
response = table.scan() | |
items.extend(response['Items']) | |
while 'LastEvaluatedKey' in response: | |
response = table.scan(ExclusiveStartKey=response['LastEvaluatedKey']) | |
items.extend(response['Items']) | |
return items | |
# Export to CSV | |
def export_to_csv(items, output_path): | |
if not items: | |
print("No items found.") | |
return | |
fieldnames = sorted(items[0].keys()) | |
with open(output_path, 'w', newline='', encoding='utf-8') as csvfile: | |
writer = csv.DictWriter(csvfile, fieldnames=fieldnames) | |
writer.writeheader() | |
for item in items: | |
writer.writerow(convert_types(item)) | |
print(f"Exported {len(items)} items to {output_path}") | |
# Run export | |
items = scan_table() | |
export_to_csv(items, CSV_OUTPUT) |