This commit is contained in:
2026-04-14 18:31:48 +01:00
parent 4275cbd795
commit f4d6a97abe
36 changed files with 8777 additions and 1 deletions

4
server/.env.example Normal file
View File

@@ -0,0 +1,4 @@
PORT=3000
YJS_WS_PORT=1234
NODE_ENV=development
CORS_ORIGIN=http://localhost:5173

5
server/.gitignore vendored Normal file
View File

@@ -0,0 +1,5 @@
node_modules/
dist/
.env
*.log
.DS_Store

2112
server/package-lock.json generated Normal file

File diff suppressed because it is too large Load Diff

36
server/package.json Normal file
View File

@@ -0,0 +1,36 @@
{
"name": "p2p-poll-server",
"version": "1.0.0",
"description": "Backend server for P2P polling app with Yjs and WebRTC signaling",
"main": "dist/index.js",
"scripts": {
"dev": "tsx watch src/index.ts",
"build": "tsc",
"start": "node dist/index.js"
},
"keywords": [
"yjs",
"websocket",
"webrtc",
"p2p"
],
"author": "",
"license": "MIT",
"dependencies": {
"cors": "^2.8.5",
"dotenv": "^16.3.1",
"express": "^4.18.2",
"ws": "^8.14.2",
"y-websocket": "^1.5.0",
"yjs": "^13.6.8",
"zod": "^4.3.6"
},
"devDependencies": {
"@types/cors": "^2.8.15",
"@types/express": "^4.17.20",
"@types/node": "^20.9.0",
"@types/ws": "^8.5.8",
"tsx": "^4.6.2",
"typescript": "^5.2.2"
}
}

59
server/src/index.ts Normal file
View File

@@ -0,0 +1,59 @@
import express from 'express';
import http from 'http';
import cors from 'cors';
import dotenv from 'dotenv';
import { createYjsServer } from './yjs-server';
import { createSignalingServer } from './signaling-server';
import { logger } from './utils/logger';
dotenv.config();
const app = express();
const PORT = process.env.PORT || 5000;
app.use(cors({
origin: process.env.CORS_ORIGIN || ['http://localhost:5173', 'http://localhost:5174', 'http://localhost:5175'],
credentials: true
}));
app.use(express.json());
app.get('/health', (req, res) => {
res.json({
status: 'ok',
timestamp: new Date().toISOString(),
services: {
yjs: 'running',
signaling: 'running'
}
});
});
app.get('/', (req, res) => {
res.json({
message: 'P2P Poll Server',
endpoints: {
health: '/health',
yjs: 'ws://localhost:' + PORT + '/yjs',
signaling: 'ws://localhost:' + PORT + '/signal'
}
});
});
const server = http.createServer(app);
createYjsServer(server, PORT as number);
createSignalingServer(server);
server.listen(PORT, () => {
logger.info(`Server running on port ${PORT}`);
logger.info(`Yjs WebSocket: ws://localhost:${PORT}/yjs`);
logger.info(`Signaling WebSocket: ws://localhost:${PORT}/signal`);
});
process.on('SIGTERM', () => {
logger.info('SIGTERM signal received: closing HTTP server');
server.close(() => {
logger.info('HTTP server closed');
});
});

View File

@@ -0,0 +1,305 @@
import { WebSocketServer, WebSocket } from 'ws';
import http from 'http';
import { SignalingMessage, SignalingMessageSchema } from './types/poll.types';
import { logger } from './utils/logger';
interface Client {
id: string;
ws: WebSocket;
roomId: string;
lastSeen: number;
messageCount: number;
lastMessageTime: number;
}
export function createSignalingServer(server: http.Server) {
const wss = new WebSocketServer({
noServer: true
});
// Handle upgrade requests for /signal path
server.on('upgrade', (request, socket, head) => {
const pathname = request.url || '';
if (pathname === '/signal' || pathname.startsWith('/signal?')) {
logger.info(`[SIGNALING] Upgrade request for path: ${pathname}`);
wss.handleUpgrade(request, socket, head, (ws) => {
wss.emit('connection', ws, request);
});
}
});
const clients = new Map<string, Client>();
const rooms = new Map<string, Set<string>>();
const roomPasswords = new Map<string, string>();
const HEARTBEAT_INTERVAL = 30000;
const CLIENT_TIMEOUT = 60000;
const RATE_LIMIT_WINDOW = 1000;
const RATE_LIMIT_MAX = 10;
const heartbeatInterval = setInterval(() => {
const now = Date.now();
clients.forEach((client, clientId) => {
if (now - client.lastSeen > CLIENT_TIMEOUT) {
logger.info(`Client ${clientId} timed out, removing...`);
handleClientLeave(clientId);
} else {
client.ws.send(JSON.stringify({ type: 'ping' }));
}
});
}, HEARTBEAT_INTERVAL);
wss.on('connection', (ws: WebSocket) => {
let clientId: string | null = null;
const tempClientId = `temp-${Date.now()}-${Math.random().toString(36).substr(2, 9)}`;
logger.info(`[SIGNALING] New WebSocket connection (temp: ${tempClientId})`);
ws.on('message', (data: Buffer) => {
try {
const parsed = JSON.parse(data.toString());
const validationResult = SignalingMessageSchema.safeParse(parsed);
if (!validationResult.success) {
logger.error('Invalid signaling message:', validationResult.error);
ws.send(JSON.stringify({
type: 'error',
message: 'Invalid message format',
errors: validationResult.error.issues
}));
return;
}
const message: SignalingMessage = validationResult.data;
if (clientId) {
const client = clients.get(clientId);
if (client) {
const now = Date.now();
if (now - client.lastMessageTime < RATE_LIMIT_WINDOW) {
client.messageCount++;
if (client.messageCount > RATE_LIMIT_MAX) {
logger.warn(`Client ${clientId} exceeded rate limit`);
ws.send(JSON.stringify({
type: 'error',
message: 'Rate limit exceeded'
}));
return;
}
} else {
client.messageCount = 1;
client.lastMessageTime = now;
}
}
}
switch (message.type) {
case 'subscribe':
// y-webrtc subscribe message - client wants to join topics
if (message.topics) {
message.topics.forEach((topic: string) => {
if (!rooms.has(topic)) {
rooms.set(topic, new Set());
}
const tempId = `sub-${Date.now()}-${Math.random().toString(36).substr(2, 9)}`;
clientId = tempId;
rooms.get(topic)!.add(tempId);
clients.set(tempId, {
id: tempId,
ws,
roomId: topic,
lastSeen: Date.now(),
messageCount: 0,
lastMessageTime: Date.now()
});
logger.info(`[SIGNALING] Client subscribed to topic: ${topic}`);
});
}
break;
case 'unsubscribe':
// y-webrtc unsubscribe message
if (message.topics && clientId) {
message.topics.forEach((topic: string) => {
const room = rooms.get(topic);
if (room && clientId) {
room.delete(clientId);
if (room.size === 0) {
rooms.delete(topic);
}
}
});
}
break;
case 'publish':
// y-webrtc publish message - broadcast to all subscribers of a topic
if (message.topic) {
const topic = message.topic;
const room = rooms.get(topic);
if (room) {
room.forEach((subscriberId) => {
const subscriber = clients.get(subscriberId);
if (subscriber && subscriber.ws !== ws && subscriber.ws.readyState === WebSocket.OPEN) {
subscriber.ws.send(JSON.stringify({
type: 'publish',
topic: topic,
data: message.data
}));
}
});
}
}
break;
case 'join':
clientId = message.from || `client-${Date.now()}`;
const roomId = message.roomId || 'default-room';
const roomPassword = roomPasswords.get(roomId);
if (roomPassword && message.password !== roomPassword) {
logger.warn(`Client ${clientId} failed password authentication for room ${roomId}`);
ws.send(JSON.stringify({
type: 'error',
message: 'Invalid room password'
}));
return;
}
clients.set(clientId, {
id: clientId,
ws,
roomId,
lastSeen: Date.now(),
messageCount: 0,
lastMessageTime: Date.now()
});
if (!rooms.has(roomId)) {
rooms.set(roomId, new Set());
}
rooms.get(roomId)!.add(clientId);
logger.info(`Client ${clientId} joined room ${roomId}`);
const roomClients = Array.from(rooms.get(roomId)!).filter(id => id !== clientId);
ws.send(JSON.stringify({
type: 'peers',
peers: roomClients
}));
roomClients.forEach(peerId => {
const peer = clients.get(peerId);
if (peer && peer.ws.readyState === WebSocket.OPEN) {
peer.ws.send(JSON.stringify({
type: 'peer-joined',
peerId: clientId
}));
}
});
break;
case 'offer':
case 'answer':
case 'ice-candidate':
if (message.to) {
const targetClient = clients.get(message.to);
if (targetClient && targetClient.ws.readyState === WebSocket.OPEN) {
targetClient.ws.send(JSON.stringify({
type: message.type,
from: message.from,
data: message.data
}));
}
}
break;
case 'leave':
if (message.from) {
handleClientLeave(message.from);
}
break;
case 'create-room':
if (message.roomId && message.password) {
roomPasswords.set(message.roomId, message.password);
logger.info(`Room ${message.roomId} created with password protection`);
ws.send(JSON.stringify({
type: 'room-created',
roomId: message.roomId
}));
}
break;
case 'ping':
if (clientId) {
const client = clients.get(clientId);
if (client) {
client.lastSeen = Date.now();
client.ws.send(JSON.stringify({ type: 'pong', from: 'server' }));
}
}
break;
case 'pong':
if (clientId) {
const client = clients.get(clientId);
if (client) {
client.lastSeen = Date.now();
}
}
break;
}
} catch (error) {
logger.error('Error processing signaling message:', error);
}
});
ws.on('close', () => {
if (clientId) {
handleClientLeave(clientId);
}
});
ws.on('error', (error) => {
logger.error('WebSocket error:', error);
});
});
function handleClientLeave(clientId: string) {
const client = clients.get(clientId);
if (client) {
const roomId = client.roomId;
const room = rooms.get(roomId);
if (room) {
room.delete(clientId);
room.forEach(peerId => {
const peer = clients.get(peerId);
if (peer && peer.ws.readyState === WebSocket.OPEN) {
peer.ws.send(JSON.stringify({
type: 'peer-left',
peerId: clientId
}));
}
});
if (room.size === 0) {
rooms.delete(roomId);
}
}
clients.delete(clientId);
logger.info(`Client ${clientId} left room ${roomId}`);
}
}
wss.on('close', () => {
clearInterval(heartbeatInterval);
logger.info('Signaling server closed');
});
logger.info('Signaling server running at path /signal');
return wss;
}

View File

@@ -0,0 +1,40 @@
import { z } from 'zod';
export interface PollOption {
id: string;
text: string;
votes: number;
votedBy: string[];
createdBy: string;
timestamp: number;
}
export interface Poll {
id: string;
question: string;
createdBy: string;
timestamp: number;
options: PollOption[];
}
export const SignalingMessageSchema = z.object({
type: z.enum(['offer', 'answer', 'ice-candidate', 'join', 'leave', 'ping', 'pong', 'create-room', 'subscribe', 'unsubscribe', 'publish', 'signal']),
from: z.string().optional(),
to: z.string().optional(),
data: z.any().optional(),
roomId: z.string().optional(),
password: z.string().optional(),
topics: z.array(z.string()).optional(),
topic: z.string().optional()
});
export interface SignalingMessage {
type: 'offer' | 'answer' | 'ice-candidate' | 'join' | 'leave' | 'ping' | 'pong' | 'create-room' | 'subscribe' | 'unsubscribe' | 'publish' | 'signal';
from?: string;
to?: string;
data?: any;
roomId?: string;
password?: string;
topics?: string[];
topic?: string;
}

115
server/src/utils/logger.ts Normal file
View File

@@ -0,0 +1,115 @@
type LogLevel = 'info' | 'error' | 'warn' | 'debug';
type LogContext = Record<string, any>;
interface LogEntry {
timestamp: string;
level: LogLevel;
message: string;
context?: LogContext;
}
class Logger {
private context: LogContext = {};
private timers: Map<string, number> = new Map();
setContext(ctx: LogContext): void {
this.context = { ...this.context, ...ctx };
}
clearContext(): void {
this.context = {};
}
private formatLog(level: LogLevel, message: string, args: any[]): LogEntry {
const entry: LogEntry = {
timestamp: new Date().toISOString(),
level,
message,
context: Object.keys(this.context).length > 0 ? { ...this.context } : undefined
};
if (args.length > 0) {
if (entry.context) {
entry.context.args = args;
} else {
entry.context = { args };
}
}
return entry;
}
private log(level: LogLevel, message: string, ...args: any[]): void {
const entry = this.formatLog(level, message, args);
const logString = JSON.stringify(entry);
switch (level) {
case 'info':
console.log(logString);
break;
case 'error':
console.error(logString);
break;
case 'warn':
console.warn(logString);
break;
case 'debug':
if (process.env.NODE_ENV === 'development') {
console.debug(logString);
}
break;
}
}
info(message: string, ...args: any[]): void {
this.log('info', message, ...args);
}
error(message: string, ...args: any[]): void {
this.log('error', message, ...args);
}
warn(message: string, ...args: any[]): void {
this.log('warn', message, ...args);
}
debug(message: string, ...args: any[]): void {
this.log('debug', message, ...args);
}
startTimer(label: string): void {
this.timers.set(label, Date.now());
}
endTimer(label: string): number {
const startTime = this.timers.get(label);
if (!startTime) {
this.warn(`Timer '${label}' not found`);
return 0;
}
const duration = Date.now() - startTime;
this.timers.delete(label);
this.debug(`Timer '${label}': ${duration}ms`);
return duration;
}
time<T>(label: string, fn: () => T): T {
this.startTimer(label);
try {
return fn();
} finally {
this.endTimer(label);
}
}
async timeAsync<T>(label: string, fn: () => Promise<T>): Promise<T> {
this.startTimer(label);
try {
return await fn();
} finally {
this.endTimer(label);
}
}
}
export const logger = new Logger();

51
server/src/yjs-server.ts Normal file
View File

@@ -0,0 +1,51 @@
import { WebSocketServer } from 'ws';
// @ts-ignore
import { setupWSConnection } from 'y-websocket/bin/utils';
import http from 'http';
import { logger } from './utils/logger';
export function createYjsServer(server: http.Server, port: number) {
const wss = new WebSocketServer({
noServer: true
});
// Handle upgrade requests for /yjs/* paths
server.on('upgrade', (request, socket, head) => {
const pathname = request.url || '';
if (pathname.startsWith('/yjs')) {
logger.info(`[YJS] Upgrade request for path: ${pathname}`);
wss.handleUpgrade(request, socket, head, (ws) => {
wss.emit('connection', ws, request);
});
}
});
wss.on('connection', (ws, req) => {
const url = req.url || 'unknown';
const remoteAddress = req.socket.remoteAddress || 'unknown';
logger.info(`[YJS] New connection from ${remoteAddress}, URL: ${url}`);
// Log when connection closes
ws.on('close', () => {
logger.info(`[YJS] Connection closed from ${remoteAddress}`);
});
ws.on('error', (error) => {
logger.error(`[YJS] Connection error from ${remoteAddress}:`, error);
});
// y-websocket automatically handles docName from the URL path
// The room name is passed as part of the URL: /yjs/room-name
// We don't need to manually extract it
setupWSConnection(ws, req, { gc: true });
});
wss.on('error', (error) => {
logger.error('Yjs WebSocket server error:', error);
});
logger.info(`Yjs WebSocket server running on port ${port} at path /yjs`);
return wss;
}

20
server/tsconfig.json Normal file
View File

@@ -0,0 +1,20 @@
{
"compilerOptions": {
"target": "ES2020",
"module": "commonjs",
"lib": ["ES2020"],
"outDir": "./dist",
"rootDir": "./src",
"strict": true,
"esModuleInterop": true,
"skipLibCheck": true,
"forceConsistentCasingInFileNames": true,
"resolveJsonModule": true,
"moduleResolution": "node",
"declaration": true,
"declarationMap": true,
"sourceMap": true
},
"include": ["src/**/*"],
"exclude": ["node_modules", "dist"]
}