Add worker, queue, redis, and call python script to import files

This commit is contained in:
Paul Bienkowski 2021-04-19 21:38:51 +02:00
parent 764a711a9e
commit bca3582e30
22 changed files with 12391 additions and 455 deletions

View file

@ -10,7 +10,7 @@ WORKDIR /opt/obs/api
ADD package.json package-lock.json /opt/obs/api/ ADD package.json package-lock.json /opt/obs/api/
RUN npm ci RUN npm ci
ADD scripts/setup.py scripts/requirements.txt /opt/obs/api/scripts/ ADD scripts /opt/obs/api/scripts/
RUN cd scripts && pip install -e . RUN cd scripts && pip install -e .
ADD views /opt/obs/api/views/ ADD views /opt/obs/api/views/

View file

@ -8,6 +8,7 @@
"url": "mongodb://mongo/obsTest", "url": "mongodb://mongo/obsTest",
"debug": true "debug": true
}, },
"redisUrl": "redis://redis",
"oAuth2Clients": [ "oAuth2Clients": [
{ {
"clientId": "b730f8d2-d93c-4c68-9ff0-dfac8da76ee2", "clientId": "b730f8d2-d93c-4c68-9ff0-dfac8da76ee2",

View file

@ -16,6 +16,7 @@
"url": "mongodb://user:pass@host/obs", "url": "mongodb://user:pass@host/obs",
"debug": false "debug": false
}, },
"redisUrl": "redis://localhost",
"oAuth2Clients": [ "oAuth2Clients": [
{ {
"clientId": "CHANGEME", "clientId": "CHANGEME",

12156
api/package-lock.json generated

File diff suppressed because it is too large Load diff

View file

@ -6,7 +6,9 @@
"scripts": { "scripts": {
"mongo:start": "docker run --name realworld-mongo -p 27017:27017 mongo & sleep 5", "mongo:start": "docker run --name realworld-mongo -p 27017:27017 mongo & sleep 5",
"start": "node src/", "start": "node src/",
"start:worker": "node src/worker.js",
"dev": "nodemon src/", "dev": "nodemon src/",
"dev:worker": "nodemon -w src/ src/worker.js",
"mongo:stop": "docker stop realworld-mongo && docker rm realworld-mongo", "mongo:stop": "docker stop realworld-mongo && docker rm realworld-mongo",
"autoformat": "eslint --fix .", "autoformat": "eslint --fix .",
"lint": "eslint .", "lint": "eslint .",
@ -22,6 +24,7 @@
"license": "LGPLv3", "license": "LGPLv3",
"dependencies": { "dependencies": {
"body-parser": "1.19.0", "body-parser": "1.19.0",
"bull": "^3.22.0",
"connect-busboy": "0.0.2", "connect-busboy": "0.0.2",
"cors": "2.8.5", "cors": "2.8.5",
"csv-parse": "^4.15.1", "csv-parse": "^4.15.1",

@ -1 +1 @@
Subproject commit 1f340a835c327f6410b1954a9824a9d4a5b7d9b0 Subproject commit ed2ddf8104fe3bc625a76842d77c0233bda1ad84

View file

@ -26,6 +26,8 @@ const configSchema = Joi.object({
debug: Joi.boolean().default(process.env.NODE_ENV !== 'production'), debug: Joi.boolean().default(process.env.NODE_ENV !== 'production'),
}).required(), }).required(),
redisUrl: Joi.string().required(),
oAuth2Clients: Joi.array() oAuth2Clients: Joi.array()
.default([]) .default([])
.items( .items(

View file

@ -5,7 +5,6 @@ const config = require('./config')
mongoose.connect(config.mongodb.url); mongoose.connect(config.mongodb.url);
mongoose.set('debug', config.mongodb.debug); mongoose.set('debug', config.mongodb.debug);
require('./models/TrackData');
require('./models/User'); require('./models/User');
require('./models/Track'); require('./models/Track');
require('./models/Comment'); require('./models/Comment');

View file

@ -6,25 +6,66 @@ const slug = require('slug');
const path = require('path'); const path = require('path');
const sanitize = require('sanitize-filename'); const sanitize = require('sanitize-filename');
const fs = require('fs'); const fs = require('fs');
const uuid = require('uuid/v4');
const { parseTrackPoints } = require('../logic/tracks'); const { TRACKS_DIR } = require('../paths');
const queue = require('../queue');
const TrackData = require('./TrackData'); const statisticsSchema = new mongoose.Schema(
{
const DATA_DIR = process.env.DATA_DIR || path.resolve(__dirname, '../../data/'); recordedAt: Date,
recordedUntil: Date,
duration: Number,
length: Number,
segments: Number,
numEvents: Number,
numMeasurements: Number,
numValid: Number,
},
{ timestamps: false },
);
const schema = new mongoose.Schema( const schema = new mongoose.Schema(
{ {
// A (partially or entirely random generated) string that can be used as a
// public identifier
slug: { type: String, lowercase: true, unique: true }, slug: { type: String, lowercase: true, unique: true },
// The title for this track.
title: String, title: String,
// The status of this track, whether it is to be processed, is currently
// being processed, or has completed or errored.
processingStatus: {
type: String,
enum: ['pending', 'processing', 'complete', 'error'],
default: 'pending',
},
processingJobId: String,
// Output from the proccessing routines regarding this track. Might be
// displayed to the owner or administrators to help in debugging. Should be
// set to `null` if no processing has not been finished.
processingLog: String,
// Set to true if the user customized the title. Disables auto-generating
// an updated title when the track is (re-)processed.
customizedTitle: { type: Boolean, default: false },
// A user-provided description of the track. May contain markdown.
description: String, description: String,
// Whether this track is visible in the public track list or not.
visible: Boolean, visible: Boolean,
// The user agent string, or a part thereof, that was used to upload this
// track. Usually contains only the OBS version, other user agents are
// discarded due to being irrelevant.
uploadedByUserAgent: String, uploadedByUserAgent: String,
body: String, // deprecated, remove after migration has read it
comments: [{ type: mongoose.Schema.Types.ObjectId, ref: 'Comment' }], // The name of the original file, as provided during upload. Used for
author: { type: mongoose.Schema.Types.ObjectId, ref: 'User' }, // providing a download with the same name, and for display in the
trackData: { type: mongoose.Schema.Types.ObjectId, ref: 'TrackData' }, // frontend.
publicTrackData: { type: mongoose.Schema.Types.ObjectId, ref: 'TrackData' },
originalFileName: { originalFileName: {
type: String, type: String,
required: true, required: true,
@ -36,7 +77,15 @@ const schema = new mongoose.Schema(
message: (props) => `${props.value} is not a valid filename`, message: (props) => `${props.value} is not a valid filename`,
}, },
}, },
originalFilePath: String,
// Where the files are stored, relative to a group directory like
// TRACKS_DIR or PROCESSING_DIR.
filePath: String,
comments: [{ type: mongoose.Schema.Types.ObjectId, ref: 'Comment' }],
author: { type: mongoose.Schema.Types.ObjectId, ref: 'User' },
statistics: statisticsSchema,
}, },
{ timestamps: true }, { timestamps: true },
); );
@ -49,8 +98,8 @@ schema.pre('validate', async function (next) {
this.slugify(); this.slugify();
} }
if (!this.originalFilePath) { if (!this.filePath) {
await this.generateOriginalFilePath(); await this.generateFilePath();
} }
next(); next();
@ -87,9 +136,9 @@ class Track extends mongoose.Model {
this.slug = slug(this.title || 'track') + '-' + ((Math.random() * Math.pow(36, 6)) | 0).toString(36); this.slug = slug(this.title || 'track') + '-' + ((Math.random() * Math.pow(36, 6)) | 0).toString(36);
} }
async generateOriginalFilePath() { async generateFilePath() {
await this.populate('author').execPopulate(); await this.populate('author').execPopulate();
this.originalFilePath = path.join('uploads', 'originals', this.author.username, this.slug, 'original.csv'); this.filePath = path.join(this.author.username, this.slug);
} }
isVisibleTo(user) { isVisibleTo(user) {
@ -113,81 +162,81 @@ class Track extends mongoose.Model {
} }
async _ensureDirectoryExists() { async _ensureDirectoryExists() {
if (!this.originalFilePath) { if (!this.filePath) {
await this.generateOriginalFilePath(); await this.generateFilePath();
} }
const dir = path.join(DATA_DIR, path.dirname(this.originalFilePath)); const dir = path.dirname(this.getOriginalFilePath());
await fs.promises.mkdir(dir, { recursive: true }); await fs.promises.mkdir(dir, { recursive: true });
} }
get fullOriginalFilePath() { getOriginalFilePath() {
return path.join(DATA_DIR, this.originalFilePath); if (!this.filePath) {
throw new Error('Cannot get original file path, `filePath` is not yet set. Call `generateFilePath()` first.');
}
return path.join(TRACKS_DIR, this.filePath, 'original.csv');
} }
async writeToOriginalFile(fileBody) { async writeToOriginalFile(fileBody) {
await this._ensureDirectoryExists(); await this._ensureDirectoryExists();
await fs.promises.writeFile(this.fullOriginalFilePath, fileBody); await fs.promises.writeFile(this.getOriginalFilePath(), fileBody);
} }
/** /**
* Fills the trackData and publicTrackData with references to correct * Marks this track as needing processing.
* TrackData objects. For now, this is either the same, or publicTrackData
* is set to null, depending on the visibility of the track. At some point,
* this will include the anonymisation step, and produce a distinct TrackData
* object for the publicTrackData reference.
* *
* Existing TrackData objects will be deleted by this function. * Also deletes all stored information that is derived during processing from
* the database, such that it may be filled again with correct information
* during the processing operation.
*
* Saves the track as well, so it is up to date when the worker receives it.
*/ */
async rebuildTrackDataAndSave() { async queueProcessing() {
// clean up existing track data, we want to actually fully delete it this.processingStatus = 'pending';
if (this.trackData) { this.processingLog = null;
await TrackData.findByIdAndDelete(this.trackData); this.processingJobId = uuid();
}
if (this.publicTrackData && this.publicTrackData.equals(this.trackData)) {
await TrackData.findByIdAndDelete(this.publicTrackData);
}
// Parse the points from the body.
// TODO: Stream file contents, if possible
const body = await fs.promises.readFile(this.fullOriginalFilePath);
const points = Array.from(parseTrackPoints(body));
const trackData = TrackData.createFromPoints(points);
await trackData.save();
this.trackData = trackData._id;
if (this.visible) {
// TODO: create a distinct object with filtered data
this.publicTrackData = trackData._id;
}
await this.save(); await this.save();
return await queue.add(
'processTrack',
{
trackId: this._id.toString(),
},
{
jobId: this.processingJobId,
},
);
}
async readProcessingResults(success = true) {
// Copies some information into this object from the outputs of the
// processing step. This allows general statistics to be formed, and other
// information to be displayed, without having to read individual files
// from disk. Each field set here should be unsed in `queueProcessing`.
// This routine also moves the `processingStatus` along.
} }
async autoGenerateTitle() { async autoGenerateTitle() {
if (this.title) { if (this.customizedTitle) {
return; return;
} }
// for (const property of ['publicTrackData', 'trackData']) {
// if (this[property]) {
// await this.populate(property).execPopulate();
// if (this[property].recordedAt) {
// const dateTime = DateTime.fromJSDate(this[property].recordedAt);
// const daytime = getDaytime(dateTime);
// this.title = `${daytime} ride on ${dateTime.toLocaleString(DateTime.DATE_MED)}`;
// await this.save();
// return
// }
// }
// }
if (this.originalFileName) { if (this.originalFileName) {
this.title = _.upperFirst(_.words(this.originalFileName.replace(/(\.obsdata)?\.csv$/, '')).join(' ')); this.title = _.upperFirst(_.words(this.originalFileName.replace(/(\.obsdata)?\.csv$/, '')).join(' '));
}
for (const property of ['publicTrackData', 'trackData']) {
if (!this.title && this[property]) {
await this.populate(property).execPopulate();
if (this[property].recordedAt) {
const dateTime = DateTime.fromJSDate(this[property].recordedAt);
const daytime = getDaytime(dateTime);
this.title = `${daytime} ride on ${dateTime.toLocaleString(DateTime.DATE_MED)}`;
}
}
}
if (this.title) {
await this.save(); await this.save();
} }
} }
@ -203,6 +252,8 @@ class Track extends mongoose.Model {
updatedAt: this.updatedAt, updatedAt: this.updatedAt,
visible: this.visible, visible: this.visible,
author: this.author.toProfileJSONFor(user), author: this.author.toProfileJSONFor(user),
statistics: this.statistics,
processingStatus: this.processingStatus,
...(includePrivateFields ...(includePrivateFields
? { ? {
uploadedByUserAgent: this.uploadedByUserAgent, uploadedByUserAgent: this.uploadedByUserAgent,

View file

@ -1,105 +0,0 @@
const mongoose = require('mongoose');
const uniqueValidator = require('mongoose-unique-validator');
const turf = require('turf');
const { flow, filter, map, pairwise, reduce } = require('../_helpers/generators');
const schema = new mongoose.Schema(
{
slug: { type: String, lowercase: true, unique: true },
numEvents: { type: Number, default: 0 },
recordedAt: { type: Date },
recordedUntil: { type: Date },
trackLength: { type: Number },
points: [
{
date: String,
time: String,
latitude: Number,
longitude: Number,
course: Number,
speed: Number,
d1: Number,
d2: Number,
flag: Number,
private: Number,
},
],
},
{ timestamps: true },
);
schema.plugin(uniqueValidator, { message: 'is already taken' });
schema.pre('validate', function (next) {
if (!this.slug) {
this.slugify();
}
next();
});
schema.set('toJSON', { virtuals: true });
class TrackData extends mongoose.Model {
slugify() {
this.slug = 'td-' + String((Math.random() * Math.pow(36, 6)) | 0).toString(36);
}
countEvents() {
return this.points.filter((p) => p.flag).length;
}
getRecoredAt(findEnd = false) {
const pointsWithDate = this.points.filter((p) => p.date && p.time);
if (!pointsWithDate.length) {
return null;
}
const point = pointsWithDate[findEnd ? pointsWithDate.length - 1 : 0];
const [day, month, year] = point.date.split('.');
const combinedString = `${year}-${month}-${day} ${point.time}.000+2000`;
const parsedDate = new Date(combinedString);
if (isNaN(parsedDate.getDate())) {
return null;
}
return parsedDate;
}
static createFromPoints(points) {
const trackData = new TrackData();
trackData.points = points;
trackData.numEvents = trackData.countEvents();
trackData.recordedAt = trackData.getRecoredAt();
trackData.recordedUntil = trackData.getRecoredAt(true);
trackData.trackLength = trackData.measureTrackLength();
return trackData;
}
measureTrackLength() {
return flow(
filter((p) => p.latitude != null && p.longitude != null),
map((p) => turf.point([p.longitude, p.latitude])),
pairwise,
map(([a, b]) => turf.distance(a, b) * 1000),
// Ignore distances between two points that are bigger than 100m, this
// must be a gap in the data or a mistake.
filter((d) => d <= 100),
reduce((c, d) => c + d, 0),
)(this.points);
}
get duration() {
if (this.recordedAt == null || this.recordedUntil == null) {
return null;
}
return (this.recordedUntil.getTime() - this.recordedAt.getTime()) / 1000;
}
}
mongoose.model(TrackData, schema);
module.exports = TrackData;

View file

@ -3,5 +3,4 @@ module.exports.AuthorizationCode = require('./AuthorizationCode')
module.exports.Comment = require('./Comment') module.exports.Comment = require('./Comment')
module.exports.RefreshToken = require('./RefreshToken') module.exports.RefreshToken = require('./RefreshToken')
module.exports.Track = require('./Track') module.exports.Track = require('./Track')
module.exports.TrackData = require('./TrackData')
module.exports.User = require('./User') module.exports.User = require('./User')

View file

@ -63,7 +63,7 @@ passport.use(
), ),
); );
function getRequestToken(req) { function getRequestToken(req, tokenTypes = ['Token', 'Bearer']) {
const authorization = req.headers.authorization; const authorization = req.headers.authorization;
if (typeof authorization !== 'string') { if (typeof authorization !== 'string') {
return null; return null;
@ -71,7 +71,7 @@ function getRequestToken(req) {
const [tokenType, token] = authorization.split(' '); const [tokenType, token] = authorization.split(' ');
if (tokenType === 'Token' || tokenType === 'Bearer') { if (tokenTypes.includes(tokenType)) {
return token; return token;
} }
@ -139,7 +139,7 @@ passport.use(
try { try {
let userId; let userId;
const headerToken = getRequestToken(req); const headerToken = getRequestToken(req, ['OBSUserId']);
if (headerToken && headerToken.length === 24) { if (headerToken && headerToken.length === 24) {
userId = headerToken; userId = headerToken;
} }

26
api/src/paths.js Normal file
View file

@ -0,0 +1,26 @@
const path = require('path');
const DATA_DIR = process.env.DATA_DIR || path.resolve(__dirname, '../../data/');
// Contains the subtree for processing files
const PROCESSING_DIR = path.join(DATA_DIR, 'processing');
const PROCESSING_OUTPUT_DIR = path.join(DATA_DIR, 'processing-output');
// Contains the subtree for processing files, without privatization techniques,
// used only for display of tracks to authors
const PROCESSING_DIR_PRIVATE = path.join(DATA_DIR, 'private');
// Contains original track files
const TRACKS_DIR = path.join(DATA_DIR, 'tracks');
// Cache directory for all obs-face calls
const OBS_FACE_CACHE_DIR = path.join(DATA_DIR, 'obs-face-cache');
module.exports = {
DATA_DIR,
PROCESSING_DIR,
PROCESSING_OUTPUT_DIR,
PROCESSING_DIR_PRIVATE,
TRACKS_DIR,
OBS_FACE_CACHE_DIR,
}

5
api/src/queue.js Normal file
View file

@ -0,0 +1,5 @@
const Bull = require('bull')
const config = require('./config')
module.exports = new Bull('processQueue', config.redisUrl)

View file

@ -14,35 +14,25 @@ router.get(
const publicTrackCount = await Track.find({ visible: true }).count(); const publicTrackCount = await Track.find({ visible: true }).count();
const userCount = await User.find().count(); const userCount = await User.find().count();
const [{ trackLength, publicTrackLength, numEvents, trackDuration }] = await Track.aggregate([ const [{ trackLength, numEvents, trackDuration }] = await Track.aggregate([
{ $lookup: { from: 'trackdatas', localField: 'publicTrackData', foreignField: '_id', as: 'publicTrackDatas' } },
{ $lookup: { from: 'trackdatas', localField: 'trackData', foreignField: '_id', as: 'trackDatas' } },
{ {
$addFields: { $addFields: {
publicTrackData: { $arrayElemAt: ['$publicTrackDatas', 0] }, trackLength: '$statistics.length',
trackData: { $arrayElemAt: ['$trackDatas', 0] }, numEvents: '$statistics.numEvents',
},
},
{
$addFields: {
publicTrackLength: '$publicTrackData.trackLength',
trackLength: '$trackData.trackLength',
numEvents: '$publicTrackData.numEvents',
trackDuration: { trackDuration: {
$cond: [ $cond: [
{ $and: ['$publicTrackData.recordedUntil', '$publicTrackData.recordedAt'] }, { $and: ['$statistics.recordedUntil', '$statistics.recordedAt'] },
{ $subtract: ['$publicTrackData.recordedUntil', '$publicTrackData.recordedAt'] }, { $subtract: ['$statistics.recordedUntil', '$statistics.recordedAt'] },
0, 0,
], ],
}, },
}, },
}, },
{ $project: { publicTrackLength: true, trackLength: true, numEvents: true, trackDuration: true } }, { $project: {trackLength: true, numEvents: true, trackDuration: true } },
{ {
$group: { $group: {
_id: 'sum', _id: 'sum',
trackLength: { $sum: '$trackLength' }, trackLength: { $sum: '$trackLength' },
publicTrackLength: { $sum: '$publicTrackLength' },
numEvents: { $sum: '$numEvents' }, numEvents: { $sum: '$numEvents' },
trackDuration: { $sum: '$trackDuration' }, trackDuration: { $sum: '$trackDuration' },
}, },
@ -53,7 +43,7 @@ router.get(
return res.json({ return res.json({
publicTrackCount, publicTrackCount,
publicTrackLength, publicTrackLength: trackLengthPrivatized,
trackLength: trackLengthPrivatized, trackLength: trackLengthPrivatized,
numEvents, numEvents,
trackCount, trackCount,

View file

@ -1,6 +1,7 @@
const fs = require('fs');
const path = require('path');
const router = require('express').Router(); const router = require('express').Router();
const mongoose = require('mongoose'); const mongoose = require('mongoose');
const TrackData = mongoose.model('TrackData');
const Track = mongoose.model('Track'); const Track = mongoose.model('Track');
const Comment = mongoose.model('Comment'); const Comment = mongoose.model('Comment');
const User = mongoose.model('User'); const User = mongoose.model('User');
@ -8,6 +9,7 @@ const busboy = require('connect-busboy');
const auth = require('../../passport'); const auth = require('../../passport');
const { normalizeUserAgent, buildObsver1 } = require('../../logic/tracks'); const { normalizeUserAgent, buildObsver1 } = require('../../logic/tracks');
const wrapRoute = require('../../_helpers/wrapRoute'); const wrapRoute = require('../../_helpers/wrapRoute');
const {PROCESSING_OUTPUT_DIR} = require('../../paths')
function preloadByParam(target, getValueFromParam) { function preloadByParam(target, getValueFromParam) {
return async (req, res, next, paramValue) => { return async (req, res, next, paramValue) => {
@ -181,26 +183,29 @@ router.post(
// TODO: Stream into temporary file, then move it later. // TODO: Stream into temporary file, then move it later.
const { body, fileInfo } = await getMultipartOrJsonBody(req, (body) => body.track); const { body, fileInfo } = await getMultipartOrJsonBody(req, (body) => body.track);
const {body: fileBody, visible, ...trackBody} = body const { body: fileBody, visible, ...trackBody } = body
const track = new Track({ const track = new Track({
...trackBody, ...trackBody,
author: req.user, author: req.user,
visible: visible == null ? req.user.areTracksVisibleForAll : Boolean(trackBody.visible) visible: visible == null ? req.user.areTracksVisibleForAll : Boolean(trackBody.visible)
}) })
track.customizedTitle = track.title != null
track.slugify(); track.slugify();
if (fileBody) { if (fileBody) {
track.uploadedByUserAgent = normalizeUserAgent(req.headers['user-agent']); track.uploadedByUserAgent = normalizeUserAgent(req.headers['user-agent']);
track.originalFileName = fileInfo.body ? fileInfo.body.filename : track.slug + '.csv'; track.originalFileName = fileInfo.body ? fileInfo.body.filename : track.slug + '.csv';
await track.writeToOriginalFile(fileBody) await track.writeToOriginalFile(fileBody)
await track.rebuildTrackDataAndSave();
} else {
await track.save()
} }
await track.save()
await track.autoGenerateTitle() await track.autoGenerateTitle()
if (fileBody) {
await track.queueProcessing();
}
// console.log(track.author); // console.log(track.author);
return res.json({ track: track.toJSONFor(req.user) }); return res.json({ track: track.toJSONFor(req.user) });
}), }),
@ -235,30 +240,35 @@ router.put(
if (typeof trackBody.title !== 'undefined') { if (typeof trackBody.title !== 'undefined') {
track.title = (trackBody.title || '').trim() || null; track.title = (trackBody.title || '').trim() || null;
track.customizedTitle = track.title != null
} }
if (typeof trackBody.description !== 'undefined') { if (typeof trackBody.description !== 'undefined') {
track.description = (trackBody.description || '').trim() || null; track.description = (trackBody.description || '').trim() || null;
} }
let process = false
if (trackBody.visible != null) { if (trackBody.visible != null) {
track.visible = Boolean(trackBody.visible); const visible = Boolean(trackBody.visible);
process |= visible !== track.visible
track.visible = visible
} }
if (fileBody) { if (fileBody) {
track.originalFileName = fileInfo.body ? fileInfo.body.filename : track.slug + '.csv'; track.originalFileName = fileInfo.body ? fileInfo.body.filename : track.slug + '.csv';
track.uploadedByUserAgent = normalizeUserAgent(req.headers['user-agent']); track.uploadedByUserAgent = normalizeUserAgent(req.headers['user-agent']);
await track.writeToOriginalFile(fileBody) await track.writeToOriginalFile(fileBody)
process = true
await track.rebuildTrackDataAndSave();
} else if (track.visible && !track.publicTrackData) {
await track.rebuildTrackDataAndSave();
} else {
await track.save();
} }
await track.save();
await track.autoGenerateTitle() await track.autoGenerateTitle()
if (process) {
await track.queueProcessing()
}
return res.json({ track: track.toJSONFor(req.user) }); return res.json({ track: track.toJSONFor(req.user) });
}), }),
); );
@ -269,7 +279,6 @@ router.delete(
auth.required, auth.required,
wrapRoute(async (req, res) => { wrapRoute(async (req, res) => {
if (req.track.author._id.equals(req.user.id)) { if (req.track.author._id.equals(req.user.id)) {
await TrackData.findByIdAndDelete(req.track.trackData);
await req.track.remove(); await req.track.remove();
return res.sendStatus(204); return res.sendStatus(204);
} else { } else {
@ -342,26 +351,39 @@ router.delete(
}), }),
); );
// return an track's trackData // return an track's generated data
router.get( router.get(
['/:track/data', '/:track/TrackData'], '/:track/data/:filename',
auth.optional, auth.optional,
wrapRoute(async (req, res) => { wrapRoute(async (req, res) => {
const {filename} = req.params
if (!['statistics', 'all_measurements'].includes(filename)) {
return res.sendStatus(404);
}
console.log(req.track.author, req.user)
if (!req.track.isVisibleTo(req.user)) { if (!req.track.isVisibleTo(req.user)) {
return res.sendStatus(403); return res.sendStatus(403);
} }
let trackData; const filePath = path.join(PROCESSING_OUTPUT_DIR, req.track.filePath, filename + '.json')
if (req.track.isVisibleToPrivate(req.user)) { let stats
trackData = await TrackData.findById(req.track.trackData);
} else if (!req.track.publicTrackData) { try {
return res.sendStatus(403); stats = await fs.promises.stat(filePath)
} else { } catch(err) {
trackData = await TrackData.findById(req.track.publicTrackData); return res.sendStatus(404);
} }
return res.json({ trackData }); if (!stats.isFile()) {
// file does not exist (yet)
return res.sendStatus(404);
}
const content = await fs.promises.readFile(filePath)
return res.json(JSON.parse(content));
}), }),
); );

112
api/src/worker.js Normal file
View file

@ -0,0 +1,112 @@
const fs = require('fs');
const path = require('path');
const { spawn } = require('child_process');
const queue = require('./queue');
require('./db');
const { Track } = require('./models');
const { PROCESSING_DIR, OBS_FACE_CACHE_DIR, PROCESSING_OUTPUT_DIR } = require('./paths');
queue.process('processTrack', async (job) => {
const track = await Track.findById(job.data.trackId);
if (!track) {
throw new Error('Cannot find track to process');
}
if (track.processingJobId !== job.id) {
throw new Error('Track is processed by another job');
}
if (track.processingJobId !== job.id) {
throw new Error('Track is processed by another job');
}
if (track.processingStatus !== 'pending') {
throw new Error('Track is not pending processing');
}
try {
const { filePath } = track;
console.log('Will process track', filePath);
track.processingStatus = 'processing';
track.processingLog = '';
await track.save();
// Create input directory
const inputDirectory = path.join(PROCESSING_DIR, filePath);
await fs.promises.mkdir(inputDirectory, { recursive: true });
// copy original file to processing dir
const inputFilePath = path.join(inputDirectory, 'track.csv');
const originalFilePath = track.getOriginalFilePath()
console.log(`[${track.slug}] Copy ${originalFilePath} to ${inputFilePath}`);
await fs.promises.copyFile(originalFilePath, inputFilePath);
// Create output directory
const outputDirectory = path.join(PROCESSING_OUTPUT_DIR, filePath);
await fs.promises.mkdir(outputDirectory, { recursive: true });
const stdoutFile = path.join(outputDirectory, 'stdout.log');
const stderrFile = path.join(outputDirectory, 'stderr.log');
const stdout = fs.createWriteStream(stdoutFile);
const stderr = fs.createWriteStream(stderrFile);
// TODO: Generate track transformation settings (privacy zones etc)
// const settingsFilePath = path.join(inputDirectory, 'track-settings.json');
const child = spawn(
'obs-process-track',
[
'--input',
inputFilePath,
'--output',
outputDirectory,
'--path-cache',
OBS_FACE_CACHE_DIR,
'--district',
'Freiburg im Breisgau',
// '--anonymize-user-id', 'remove',
// '--anonymize-measurement-id', 'remove',
],
{
cwd: PROCESSING_DIR,
},
);
child.stdout.pipe(process.stdout);
child.stdout.pipe(stdout);
child.stderr.pipe(process.stderr);
child.stderr.pipe(stderr);
const code = await new Promise((resolve) => child.on('close', resolve));
track.processingLog += (
await Promise.all([
fs.promises.readFile(stdoutFile),
fs.promises.readFile(stderrFile),
// split lines
])
)
.join('\n')
.trim();
if (code !== 0) {
throw new Error(`Track processing failed with status ${code}`);
}
// Read some results back into the database for quick access and
// accumulation
const statisticsContent = await fs.promises.readFile(path.join(outputDirectory, 'statistics.json'));
track.statistics = JSON.parse(statisticsContent);
track.processingStatus = 'complete';
await track.save();
} catch (err) {
console.error('Processing failed:', err);
track.processingLog += String(err) + '\n' + err.stack + '\n';
track.processingStatus = 'error';
await track.save();
}
});
console.log('Worker started.');

View file

@ -10,13 +10,20 @@ services:
- '27017:27017' - '27017:27017'
restart: on-failure restart: on-failure
redis:
image: redis
volumes:
- ./local/redis:/data
command: redis-server --appendonly yes
restart: on-failure
api: api:
image: obs-api image: obs-api
build: build:
context: ./api context: ./api
volumes: volumes:
- ./api/src:/opt/obs/api/src - ./api/src:/opt/obs/api/src
- ./api/scripts:/opt/obs/api/scripts - ./api/scripts/obs:/opt/obs/api/scripts/obs
- ./api/views:/opt/obs/api/views - ./api/views:/opt/obs/api/views
- ./local/api-data:/data - ./local/api-data:/data
- ./api/.migrations.js:/opt/obs/api/.migrations.js - ./api/.migrations.js:/opt/obs/api/.migrations.js
@ -28,6 +35,7 @@ services:
- DATA_DIR=/data - DATA_DIR=/data
links: links:
- mongo - mongo
- redis
ports: ports:
- '3000:3000' - '3000:3000'
restart: on-failure restart: on-failure
@ -36,6 +44,28 @@ services:
- run - run
- dev - dev
worker:
image: obs-api
build:
context: ./api
volumes:
- ./api/src:/opt/obs/api/src
- ./api/scripts/obs:/opt/obs/api/scripts/obs
- ./api/views:/opt/obs/api/views
- ./local/api-data:/data
- ./api/config.dev.json:/opt/obs/api/config.json
environment:
- DATA_DIR=/data
links:
- mongo
- redis
restart: on-failure
command:
- npm
- run
- dev:worker
frontend: frontend:
image: obs-frontend image: obs-frontend
build: build:

View file

@ -1,5 +1,5 @@
import React from 'react' import React from 'react'
import {List, Loader} from 'semantic-ui-react' import {List} from 'semantic-ui-react'
import {Duration} from 'luxon' import {Duration} from 'luxon'
import {FormattedDate} from 'components' import {FormattedDate} from 'components'
@ -8,7 +8,7 @@ function formatDuration(seconds) {
return Duration.fromMillis((seconds ?? 0) * 1000).toFormat("h'h' mm'm'") return Duration.fromMillis((seconds ?? 0) * 1000).toFormat("h'h' mm'm'")
} }
export default function TrackDetails({track, isAuthor, trackData}) { export default function TrackDetails({track, isAuthor}) {
return ( return (
<List> <List>
{track.visible != null && isAuthor && ( {track.visible != null && isAuthor && (
@ -46,26 +46,24 @@ export default function TrackDetails({track, isAuthor, trackData}) {
</List.Item> </List.Item>
)} )}
<Loader active={track != null && trackData == null} inline="centered" style={{marginTop: 16, marginBottom: 16}} /> {track?.statistics?.recordedAt != null && (
{trackData?.recordedAt != null && (
<List.Item> <List.Item>
<List.Header>Recorded on</List.Header> <List.Header>Recorded on</List.Header>
<FormattedDate date={trackData.recordedAt} /> <FormattedDate date={track?.statistics.recordedAt} />
</List.Item> </List.Item>
)} )}
{trackData?.numEvents != null && ( {track?.statistics?.numEvents != null && (
<List.Item> <List.Item>
<List.Header>Confirmed events</List.Header> <List.Header>Confirmed events</List.Header>
{trackData.numEvents} {track?.statistics.numEvents}
</List.Item> </List.Item>
)} )}
{trackData?.trackLength != null && ( {track?.statistics?.trackLength != null && (
<List.Item> <List.Item>
<List.Header>Length</List.Header> <List.Header>Length</List.Header>
{(trackData.trackLength / 1000).toFixed(2)} km {(track?.statistics.trackLength / 1000).toFixed(2)} km
</List.Item> </List.Item>
)} )}
</List> </List>

View file

@ -8,11 +8,15 @@ import {Fill, Stroke, Style, Text, Circle} from 'ol/style'
import {Map} from 'components' import {Map} from 'components'
import type {TrackData, TrackPoint} from 'types' import type {TrackData, TrackPoint} from 'types'
const isValidTrackPoint = (point: TrackPoint): boolean => const isValidTrackPoint = (point: TrackPoint): boolean => {
point.latitude != null && point.longitude != null && (point.latitude !== 0 || point.longitude !== 0) const longitude = point.geometry?.coordinates?.[0]
const latitude = point.geometry?.coordinates?.[1]
const WARN_DISTANCE = 200 return latitude != null && longitude != null && (latitude !== 0 || longitude !== 0)
const MIN_DISTANCE = 150 }
const WARN_DISTANCE = 2
const MIN_DISTANCE = 1.5
const evaluateDistanceColor = function (distance) { const evaluateDistanceColor = function (distance) {
if (distance < MIN_DISTANCE) { if (distance < MIN_DISTANCE) {
@ -59,7 +63,7 @@ const createTextStyle = function (distance, resolution) {
textAlign: 'center', textAlign: 'center',
textBaseline: 'middle', textBaseline: 'middle',
font: 'normal 18px/1 Arial', font: 'normal 18px/1 Arial',
text: resolution < 6 ? '' + distance : '', text: resolution < 6 ? '' + Number(distance).toFixed(2) : '',
fill: new Fill({color: evaluateDistanceColor(distance)}), fill: new Fill({color: evaluateDistanceColor(distance)}),
stroke: new Stroke({color: 'white', width: 2}), stroke: new Stroke({color: 'white', width: 2}),
offsetX: 0, offsetX: 0,
@ -94,15 +98,20 @@ export default function TrackMap({trackData, show, ...props}: {trackData: TrackD
trackPointsUntaggedD2, trackPointsUntaggedD2,
viewExtent, viewExtent,
} = React.useMemo(() => { } = React.useMemo(() => {
const trackPointsD1: Feature<Geometry>[] = [] const trackPointsD1: Feature<Point>[] = []
const trackPointsD2: Feature<Geometry>[] = [] const trackPointsD2: Feature<Point>[] = []
const trackPointsUntaggedD1: Feature<Geometry>[] = [] const trackPointsUntaggedD1: Feature<Point>[] = []
const trackPointsUntaggedD2: Feature<Geometry>[] = [] const trackPointsUntaggedD2: Feature<Point>[] = []
const points: Coordinate[] = [] const points: Coordinate[] = []
const filteredPoints: TrackPoint[] = trackData?.points.filter(isValidTrackPoint) ?? [] const filteredPoints: TrackPoint[] = trackData?.features.filter(isValidTrackPoint) ?? []
for (const dataPoint of filteredPoints) { for (const feature of filteredPoints) {
const {longitude, latitude, flag, d1, d2} = dataPoint const {
geometry: {
coordinates: [latitude, longitude],
},
properties: {confirmed: flag, distanceOvertaker: d1, distanceStationary: d2},
} = feature
const p = fromLonLat([longitude, latitude]) const p = fromLonLat([longitude, latitude])
points.push(p) points.push(p)
@ -133,7 +142,7 @@ export default function TrackMap({trackData, show, ...props}: {trackData: TrackD
const viewExtent = points.length ? trackVectorSource.getExtent() : null const viewExtent = points.length ? trackVectorSource.getExtent() : null
return {trackVectorSource, trackPointsD1, trackPointsD2, trackPointsUntaggedD1, trackPointsUntaggedD2, viewExtent} return {trackVectorSource, trackPointsD1, trackPointsD2, trackPointsUntaggedD1, trackPointsUntaggedD2, viewExtent}
}, [trackData?.points]) }, [trackData?.features])
const trackLayerStyle = React.useMemo( const trackLayerStyle = React.useMemo(
() => () =>

View file

@ -1,6 +1,6 @@
import React from 'react' import React from 'react'
import {connect} from 'react-redux' import {connect} from 'react-redux'
import {Table, Checkbox, Segment, Dimmer, Grid, Loader, Header} from 'semantic-ui-react' import {Button, Table, Checkbox, Segment, Dimmer, Grid, Loader, Header, Message} from 'semantic-ui-react'
import {useParams, useHistory} from 'react-router-dom' import {useParams, useHistory} from 'react-router-dom'
import {concat, combineLatest, of, from, Subject} from 'rxjs' import {concat, combineLatest, of, from, Subject} from 'rxjs'
import {pluck, distinctUntilChanged, map, switchMap, startWith, catchError} from 'rxjs/operators' import {pluck, distinctUntilChanged, map, switchMap, startWith, catchError} from 'rxjs/operators'
@ -51,19 +51,19 @@ const TrackPage = connect((state) => ({login: state.login}))(function TrackPage(
) )
const trackData$ = slug$.pipe( const trackData$ = slug$.pipe(
map((slug) => `/tracks/${slug}/data`), map((slug) => `/tracks/${slug}/data/all_measurements`),
switchMap((url) => switchMap((url) =>
concat( concat(
of(null), of(undefined),
from(api.get(url)).pipe( from(api.get(url)).pipe(
catchError(() => { catchError(() => {
history.replace('/tracks') // history.replace('/tracks')
return of(null)
}) })
) )
) )
), ),
pluck('trackData'), startWith(undefined) // show track infos before track data is loaded
startWith(null) // show track infos before track data is loaded
) )
const comments$ = concat(of(null), reloadComments$).pipe( const comments$ = concat(of(null), reloadComments$).pipe(
@ -110,7 +110,8 @@ const TrackPage = connect((state) => ({login: state.login}))(function TrackPage(
const {track, trackData, comments} = data || {} const {track, trackData, comments} = data || {}
const loading = track == null || trackData == null const loading = track == null || trackData === undefined
const processing = ['processing', 'pending'].includes(track?.processingStatus)
const [left, setLeft] = React.useState(true) const [left, setLeft] = React.useState(true)
const [right, setRight] = React.useState(false) const [right, setRight] = React.useState(false)
@ -119,6 +120,14 @@ const TrackPage = connect((state) => ({login: state.login}))(function TrackPage(
return ( return (
<Page> <Page>
{processing && (
<Message warning>
<Message.Content>
Track data is still being processed, please reload page in a while.
</Message.Content>
</Message>
)}
<Grid stackable> <Grid stackable>
<Grid.Row> <Grid.Row>
<Grid.Column width={12}> <Grid.Column width={12}>
@ -137,7 +146,7 @@ const TrackPage = connect((state) => ({login: state.login}))(function TrackPage(
{track && ( {track && (
<> <>
<Header as="h1">{track.title || 'Unnamed track'}</Header> <Header as="h1">{track.title || 'Unnamed track'}</Header>
<TrackDetails {...{track, trackData, isAuthor}} /> <TrackDetails {...{track, isAuthor}} />
{isAuthor && <TrackActions {...{slug}} />} {isAuthor && <TrackActions {...{slug}} />}
</> </>
)} )}

View file

@ -1,9 +1,23 @@
import type {FeatureCollection, Point} from 'geojson'
export type UserProfile = { export type UserProfile = {
username: string username: string
image: string image: string
bio?: string | null bio?: string | null
} }
export type TrackData = FeatureCollection
export type TrackStatistics = {
recordedAt?: Date
recordedUntil?: Date
duration?: number
length?: number
segments?: number
numEvents?: number
numMeasurements?: number
numValid?: number
}
export type Track = { export type Track = {
slug: string slug: string
author: UserProfile author: UserProfile
@ -11,28 +25,16 @@ export type Track = {
description?: string description?: string
createdAt: string createdAt: string
visible?: boolean visible?: boolean
} statistics?: TrackStatistics
export type TrackData = {
slug: string
numEvents?: number | null
recordedAt?: String | null
recordedUntil?: String | null
trackLength?: number | null
points: TrackPoint[]
} }
export type TrackPoint = { export type TrackPoint = {
date: string | null type: 'Feature',
time: string | null geometry: Point,
latitude: number | null properties: {
longitude: number | null distanceOvertaker: null | number,
course: number | null distanceStationary: null | number,
speed: number | null },
d1: number | null
d2: number | null
flag: number | null
private: number | null
} }
export type TrackComment = { export type TrackComment = {