document_redaction / load_dynamo_logs.py
seanpedrickcase's picture
Updated logging format for timestamps to be compatible with AWS. Added load_dynamo_logs.py example file.
94e514b
raw
history blame contribute delete
1.69 kB
import boto3
import csv
from decimal import Decimal
from boto3.dynamodb.conditions import Key
from tools.config import AWS_REGION, ACCESS_LOG_DYNAMODB_TABLE_NAME, FEEDBACK_LOG_DYNAMODB_TABLE_NAME, USAGE_LOG_DYNAMODB_TABLE_NAME, OUTPUT_FOLDER
# Replace with your actual table name and region
TABLE_NAME = USAGE_LOG_DYNAMODB_TABLE_NAME # Choose as appropriate
REGION = AWS_REGION
CSV_OUTPUT = OUTPUT_FOLDER + 'dynamodb_logs_export.csv'
# Create DynamoDB resource
dynamodb = boto3.resource('dynamodb', region_name=REGION)
table = dynamodb.Table(TABLE_NAME)
# Helper function to convert Decimal to float or int
def convert_types(item):
for key, value in item.items():
if isinstance(value, Decimal):
# Convert to int if no decimal places, else float
item[key] = int(value) if value % 1 == 0 else float(value)
return item
# Paginated scan
def scan_table():
items = []
response = table.scan()
items.extend(response['Items'])
while 'LastEvaluatedKey' in response:
response = table.scan(ExclusiveStartKey=response['LastEvaluatedKey'])
items.extend(response['Items'])
return items
# Export to CSV
def export_to_csv(items, output_path):
if not items:
print("No items found.")
return
fieldnames = sorted(items[0].keys())
with open(output_path, 'w', newline='', encoding='utf-8') as csvfile:
writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
writer.writeheader()
for item in items:
writer.writerow(convert_types(item))
print(f"Exported {len(items)} items to {output_path}")
# Run export
items = scan_table()
export_to_csv(items, CSV_OUTPUT)