obs-portal/api/tools/import_from_mongodb.py

163 lines
5.4 KiB
Python
Raw Normal View History

2021-11-04 17:13:24 +00:00
#!/usr/bin/env python3
import argparse
import asyncio
import logging
import json
2021-11-04 17:13:24 +00:00
from datetime import datetime
from uuid import uuid4
from sqlalchemy import select
from motor.motor_asyncio import AsyncIOMotorClient
2021-11-14 22:42:02 +00:00
from obs.api.db import make_session, connect_db, User, Track, Comment
2021-11-04 17:13:24 +00:00
from obs.api.app import app
log = logging.getLogger(__name__)
async def main():
logging.basicConfig(level=logging.DEBUG, format="%(levelname)s: %(message)s")
parser = argparse.ArgumentParser(
description="processes a single track for use in the portal, "
"using the obs.face algorithms"
)
parser.add_argument(
"mongodb_url",
metavar="MONGODB_URL",
help="url to the mongodb, in format mongodb://user:pass@host/dbname",
)
parser.add_argument(
"--keycloak-users-file",
metavar="FILE",
type=argparse.FileType("wt", encoding="utf-8"),
help="a file to write a JSON of all old users to, for importing to keycloak",
default=None,
)
2021-11-04 17:13:24 +00:00
args = parser.parse_args()
async with connect_db(app.config.POSTGRES_URL):
async with make_session() as session:
mongo = AsyncIOMotorClient(args.mongodb_url).get_default_database()
log.debug("Connected to mongodb and postgres.")
user_id_map = await import_users(mongo, session, args.keycloak_users_file)
2021-11-04 17:13:24 +00:00
await import_tracks(mongo, session, user_id_map)
await session.commit()
async def import_users(mongo, session, keycloak_users_file):
keycloak_users = []
2021-11-04 17:13:24 +00:00
old_id_by_email = {}
async for user in mongo.users.find({}):
old_id_by_email[user["email"]] = user["_id"]
new_user = User(
sub=str(uuid4()),
match_by_username_email=True,
email=user["email"],
username=user["username"],
bio=user.get("bio"),
image=user.get("image"),
are_tracks_visible_for_all=user.get("areTracksVisibleForAll") or False,
api_key=str(user["_id"]),
created_at=user.get("createdAt") or datetime.utcnow(),
updated_at=user.get("updatedAt") or datetime.utcnow(),
)
needs_email_verification = user.get("needsEmailValidation", True)
required_actions = ["UPDATE_PASSWORD"]
if needs_email_verification:
required_actions.append("VERIFY_EMAIL")
keycloak_users.append(
{
"username": user["username"],
"email": user["email"],
"enabled": True,
"requiredActions": required_actions,
"emailVerified": not needs_email_verification,
}
)
2021-11-04 17:13:24 +00:00
session.add(new_user)
log.info("Creating user %s", new_user.username)
await session.commit()
id_map = {}
result = await session.scalars(select(User))
for user in result:
id_map[old_id_by_email[user.email]] = user.id
if keycloak_users_file:
json.dump(keycloak_users, keycloak_users_file, indent=4)
log.info("Wrote keycloak users file to %s.", keycloak_users_file.name)
2021-11-04 17:13:24 +00:00
return id_map
def parse_datetime(s):
if isinstance(s, str):
return datetime.fromisoformat(s)
return s
async def import_tracks(mongo, session, user_id_map):
async for track in mongo.tracks.find({}):
stats = track.get("statistics") or {}
new_track = Track(
created_at=parse_datetime(track.get("createdAt")) or datetime.utcnow(),
updated_at=parse_datetime(track.get("updatedAt")) or datetime.utcnow(),
slug=track["slug"],
title=track.get("title"),
processing_status=track.get("processingStatus") or "pending",
processing_log=track.get("processingLog"),
customized_title=bool(track.get("customizedTitle")),
description=track.get("description"),
public=track.get("public"),
uploaded_by_user_agent=track.get("uploadedByUserAgent"),
original_file_name=track.get("originalFileName"),
original_file_hash=track.get("originalFileHash"),
# statistics
recorded_at=parse_datetime(stats.get("recordedAt")),
recorded_until=parse_datetime(stats.get("recordedUntil")),
duration=stats.get("duration"),
length=stats.get("length"),
segments=stats.get("segments"),
num_events=stats.get("num_events"),
num_measurements=stats.get("num_measurements"),
num_valid=stats.get("numValid"),
author_id=user_id_map[track["author"]],
)
session.add(new_track)
log.info("Creating track %s", new_track.slug)
comment_ids = track.get("comments") or []
if comment_ids:
async for comment in mongo.comments.find({"_id": {"$in": comment_ids}}):
new_comment = Comment(
created_at=parse_datetime(comment.get("createdAt"))
or datetime.utcnow(),
updated_at=parse_datetime(comment.get("updatedAt"))
or datetime.utcnow(),
body=comment.get("body"),
author_id=user_id_map[comment["author"]],
)
new_track.comments.append(new_comment)
session.add(new_comment)
await session.commit()
if __name__ == "__main__":
asyncio.run(main())