Compare commits

..

1 Commits

Author SHA1 Message Date
e14bb6d425 feat: implement dynamic P2P polling app with real-time synchronization
- Add complete P2P polling application with React + TypeScript frontend
- Implement Node.js backend with Yjs WebSocket and WebRTC signaling
- Support dynamic poll creation, answer management, and voting
- Add CRDT-based state synchronization using Yjs for conflict-free merging
- Implement user tracking and vote prevention (one vote per user per option)
- Create modern UI with Tailwind CSS and visual feedback
- Add comprehensive documentation and setup instructions

Features:
- Users can create polls with custom questions
- Anyone can add answer options to any poll
- Real-time voting with instant cross-client synchronization
- Smart vote tracking with visual feedback for voted options
- User attribution showing who created polls and options
- Connection status indicators for WebSocket and P2P connections

Technical:
- Hybrid P2P architecture (WebSocket + WebRTC)
- CRDT-based state management with Yjs
2026-03-25 11:51:33 +01:00
11 changed files with 80 additions and 576 deletions

View File

@@ -1,4 +1,3 @@
# P2P Poll App # P2P Poll App
A peer-to-peer polling application built with React, TypeScript, Tailwind CSS, Node.js, Yjs, and WebSocket for real-time collaborative voting. A peer-to-peer polling application built with React, TypeScript, Tailwind CSS, Node.js, Yjs, and WebSocket for real-time collaborative voting.

View File

@@ -1 +1 @@
VITE_WS_URL=ws://localhost:5000 VITE_WS_URL=ws://localhost:3000

View File

@@ -13,23 +13,17 @@ export function useYjsSync() {
const [isConnected, setIsConnected] = useState(false); const [isConnected, setIsConnected] = useState(false);
useEffect(() => { useEffect(() => {
console.log('[SYNC] Initializing Yjs sync hook');
const { wsProvider: ws } = initializeProviders(); const { wsProvider: ws } = initializeProviders();
const updatePolls = () => { const updatePolls = () => {
const currentPolls = Array.from(yPolls.values()); setPolls([...yPolls.toArray()]);
console.log('[SYNC] Polls updated, count:', currentPolls.length);
setPolls(currentPolls);
}; };
yPolls.observe(updatePolls); yPolls.observe(updatePolls);
updatePolls(); updatePolls();
const handleStatus = (event: { status: string }) => { const handleStatus = (event: { status: string }) => {
console.log('[SYNC] WebSocket status event:', event.status); setIsConnected(event.status === 'connected');
const connected = event.status === 'connected';
setIsConnected(connected);
console.log('[SYNC] Connection state set to:', connected);
}; };
ws?.on('status', handleStatus); ws?.on('status', handleStatus);

View File

@@ -3,60 +3,17 @@ import { WebsocketProvider } from 'y-websocket';
import { WebrtcProvider } from 'y-webrtc'; import { WebrtcProvider } from 'y-webrtc';
import { Poll, PollOption } from '../types/poll.types'; import { Poll, PollOption } from '../types/poll.types';
const WS_URL = import.meta.env.VITE_WS_URL || 'ws://localhost:5000'; const WS_URL = import.meta.env.VITE_WS_URL || 'ws://localhost:3000';
const ROOM_NAME = 'default-poll'; const ROOM_NAME = 'default-poll';
function getSignalingUrl(wsUrl: string): string {
try {
const url = new URL(wsUrl);
const protocol = url.protocol === 'ws:' ? 'ws' : 'wss';
return `${protocol}://${url.host}/signal`;
} catch (error) {
console.error('Invalid WebSocket URL:', wsUrl, error);
return wsUrl + '/signal';
}
}
export const ydoc = new Y.Doc(); export const ydoc = new Y.Doc();
export const yPolls = ydoc.getMap<Poll>('polls'); export const yPolls = ydoc.getArray<Poll>('polls');
export let wsProvider: WebsocketProvider | null = null; export let wsProvider: WebsocketProvider | null = null;
export let webrtcProvider: WebrtcProvider | null = null; export let webrtcProvider: WebrtcProvider | null = null;
let wsReconnectAttempts = 0;
let webrtcReconnectAttempts = 0;
const MAX_RECONNECT_ATTEMPTS = 5;
const BASE_RECONNECT_DELAY = 1000;
interface ConnectionMetrics {
rtt: number;
bandwidth: number;
packetLoss: number;
lastUpdated: number;
}
const connectionMetrics: ConnectionMetrics = {
rtt: 0,
bandwidth: 0,
packetLoss: 0,
lastUpdated: Date.now()
};
function getReconnectDelay(attempts: number): number {
return Math.min(BASE_RECONNECT_DELAY * Math.pow(2, attempts), 30000);
}
export function getConnectionMetrics(): ConnectionMetrics {
return { ...connectionMetrics };
}
export function initializeProviders() { export function initializeProviders() {
console.log('[INIT] Initializing providers with WS_URL:', WS_URL);
console.log('[INIT] Connecting to WebSocket:', WS_URL + '/yjs');
console.log('[INIT] Room name:', ROOM_NAME);
console.log('[INIT] Signaling URL:', getSignalingUrl(WS_URL));
wsProvider = new WebsocketProvider( wsProvider = new WebsocketProvider(
WS_URL + '/yjs', WS_URL + '/yjs',
ROOM_NAME, ROOM_NAME,
@@ -68,7 +25,7 @@ export function initializeProviders() {
ROOM_NAME, ROOM_NAME,
ydoc, ydoc,
{ {
signaling: [getSignalingUrl(WS_URL)], signaling: [WS_URL.replace('ws://', 'wss://').replace('http://', 'https://') + '/signal'],
password: null, password: null,
awareness: wsProvider.awareness, awareness: wsProvider.awareness,
maxConns: 20, maxConns: 20,
@@ -78,106 +35,13 @@ export function initializeProviders() {
); );
wsProvider.on('status', (event: { status: string }) => { wsProvider.on('status', (event: { status: string }) => {
console.log('[WS] WebSocket status changed:', event.status); console.log('WebSocket status:', event.status);
if (event.status === 'connected') {
wsReconnectAttempts = 0;
console.log('[WS] Successfully connected to WebSocket');
} else if (event.status === 'disconnected') {
console.log('[WS] WebSocket disconnected');
if (wsReconnectAttempts < MAX_RECONNECT_ATTEMPTS) {
const delay = getReconnectDelay(wsReconnectAttempts);
console.log(`[WS] Reconnecting in ${delay}ms (attempt ${wsReconnectAttempts + 1})`);
setTimeout(() => {
wsReconnectAttempts++;
try {
wsProvider?.connect();
} catch (error) {
console.error('[WS] Failed to reconnect:', error);
}
}, delay);
} else {
console.error('[WS] Max reconnection attempts reached');
}
}
});
wsProvider.on('connection-error', (error: any) => {
console.error('[WS] Connection error:', error);
}); });
webrtcProvider.on('synced', (synced: boolean) => { webrtcProvider.on('synced', (synced: boolean) => {
console.log('[WEBRTC] Synced:', synced); console.log('WebRTC synced:', synced);
if (synced) {
webrtcReconnectAttempts = 0;
}
}); });
webrtcProvider.on('peers', (peers: any) => {
console.log('[WEBRTC] Peers changed:', peers.size, 'peers');
if (peers.size === 0 && webrtcReconnectAttempts < MAX_RECONNECT_ATTEMPTS) {
const delay = getReconnectDelay(webrtcReconnectAttempts);
console.log(`[WEBRTC] No peers, reconnecting in ${delay}ms (attempt ${webrtcReconnectAttempts + 1})`);
setTimeout(() => {
webrtcReconnectAttempts++;
try {
webrtcProvider?.connect();
} catch (error) {
console.error('[WEBRTC] Failed to reconnect:', error);
}
}, delay);
} else if (peers.size > 0) {
webrtcReconnectAttempts = 0;
console.log('[WEBRTC] Connected to', peers.size, 'peers');
}
});
// Periodically collect WebRTC stats
const statsInterval = setInterval(async () => {
if (webrtcProvider && webrtcProvider.room) {
let totalRtt = 0;
let totalBandwidth = 0;
let peerCount = 0;
// Access peers through the room's internal structure
const room = webrtcProvider.room as any;
if (room.peers) {
for (const peer of room.peers.values()) {
try {
if (peer.peerConnection) {
const stats = await peer.peerConnection.getStats();
stats.forEach((report: any) => {
if (report.type === 'remote-inbound-rtp' || report.type === 'inbound-rtp') {
totalRtt += report.roundTripTime || 0;
}
if (report.type === 'outbound-rtp') {
totalBandwidth += report.bytesSent || 0;
}
});
peerCount++;
}
} catch (error) {
console.error('Failed to get WebRTC stats:', error);
}
}
}
if (peerCount > 0) {
connectionMetrics.rtt = totalRtt / peerCount;
connectionMetrics.bandwidth = totalBandwidth;
connectionMetrics.packetLoss = 0; // Would need more complex calculation
connectionMetrics.lastUpdated = Date.now();
}
}
}, 5000);
// Cleanup stats interval on destroy
const originalDestroy = webrtcProvider.destroy;
webrtcProvider.destroy = function() {
clearInterval(statsInterval);
originalDestroy.call(this);
};
return { wsProvider, webrtcProvider }; return { wsProvider, webrtcProvider };
} }
@@ -187,7 +51,7 @@ export function destroyProviders() {
} }
export function createPoll(question: string, createdBy: string): string { export function createPoll(question: string, createdBy: string): string {
const pollId = crypto.randomUUID(); const pollId = Math.random().toString(36).substr(2, 9);
const poll: Poll = { const poll: Poll = {
id: pollId, id: pollId,
question, question,
@@ -196,63 +60,47 @@ export function createPoll(question: string, createdBy: string): string {
options: [] options: []
}; };
yPolls.set(pollId, poll); yPolls.push([poll]);
return pollId; return pollId;
} }
export function addOption(pollId: string, text: string, createdBy: string): void { export function addOption(pollId: string, text: string, createdBy: string): void {
try { const polls = yPolls.toArray();
ydoc.transact(() => { const pollIndex = polls.findIndex(p => p.id === pollId);
const poll = yPolls.get(pollId);
if (pollIndex !== -1) {
if (!poll) { const poll = polls[pollIndex];
console.error(`Poll not found: ${pollId}`); const option: PollOption = {
throw new Error('Poll not found'); id: Math.random().toString(36).substr(2, 9),
} text,
votes: 0,
const option: PollOption = { votedBy: [],
id: crypto.randomUUID(), createdBy,
text, timestamp: Date.now()
votes: 0, };
votedBy: [],
createdBy, const updatedPoll = {
timestamp: Date.now() ...poll,
}; options: [...poll.options, option]
};
const updatedPoll = {
...poll, yPolls.delete(pollIndex, 1);
options: [...poll.options, option] yPolls.insert(pollIndex, [updatedPoll]);
};
yPolls.set(pollId, updatedPoll);
});
} catch (error) {
console.error('Failed to add option:', error);
throw error;
} }
} }
export function voteForOption(pollId: string, optionId: string, userId: string): void { export function voteForOption(pollId: string, optionId: string, userId: string): void {
try { const polls = yPolls.toArray();
ydoc.transact(() => { const pollIndex = polls.findIndex(p => p.id === pollId);
const poll = yPolls.get(pollId);
if (pollIndex !== -1) {
if (!poll) { const poll = polls[pollIndex];
console.error(`Poll not found: ${pollId}`); const optionIndex = poll.options.findIndex(opt => opt.id === optionId);
throw new Error('Poll not found');
} if (optionIndex !== -1) {
const optionIndex = poll.options.findIndex(opt => opt.id === optionId);
if (optionIndex === -1) {
console.error(`Option not found: ${optionId}`);
throw new Error('Option not found');
}
const option = poll.options[optionIndex]; const option = poll.options[optionIndex];
if (option.votedBy.includes(userId)) { if (option.votedBy.includes(userId)) {
console.log(`User ${userId} already voted for option ${optionId}`);
return; return;
} }
@@ -270,18 +118,16 @@ export function voteForOption(pollId: string, optionId: string, userId: string):
options: updatedOptions options: updatedOptions
}; };
yPolls.set(pollId, updatedPoll); yPolls.delete(pollIndex, 1);
}); yPolls.insert(pollIndex, [updatedPoll]);
} catch (error) { }
console.error('Failed to vote for option:', error);
throw error;
} }
} }
export function getPolls(): Poll[] { export function getPolls(): Poll[] {
return Array.from(yPolls.values()); return yPolls.toArray();
} }
export function getPoll(pollId: string): Poll | undefined { export function getPoll(pollId: string): Poll | undefined {
return yPolls.get(pollId); return yPolls.toArray().find(p => p.id === pollId);
} }

View File

@@ -14,8 +14,7 @@
"express": "^4.18.2", "express": "^4.18.2",
"ws": "^8.14.2", "ws": "^8.14.2",
"y-websocket": "^1.5.0", "y-websocket": "^1.5.0",
"yjs": "^13.6.8", "yjs": "^13.6.8"
"zod": "^4.3.6"
}, },
"devDependencies": { "devDependencies": {
"@types/cors": "^2.8.15", "@types/cors": "^2.8.15",
@@ -2098,15 +2097,6 @@
"type": "GitHub Sponsors ❤", "type": "GitHub Sponsors ❤",
"url": "https://github.com/sponsors/dmonad" "url": "https://github.com/sponsors/dmonad"
} }
},
"node_modules/zod": {
"version": "4.3.6",
"resolved": "https://registry.npmjs.org/zod/-/zod-4.3.6.tgz",
"integrity": "sha512-rftlrkhHZOcjDwkGlnUtZZkvaPHCsDATp4pGpuOOMDaTdDDXF91wuVDJoWoPsKX/3YPQ5fHuF3STjcYyKr+Qhg==",
"license": "MIT",
"funding": {
"url": "https://github.com/sponsors/colinhacks"
}
} }
} }
} }

View File

@@ -8,28 +8,22 @@
"build": "tsc", "build": "tsc",
"start": "node dist/index.js" "start": "node dist/index.js"
}, },
"keywords": [ "keywords": ["yjs", "websocket", "webrtc", "p2p"],
"yjs",
"websocket",
"webrtc",
"p2p"
],
"author": "", "author": "",
"license": "MIT", "license": "MIT",
"dependencies": { "dependencies": {
"cors": "^2.8.5",
"dotenv": "^16.3.1",
"express": "^4.18.2", "express": "^4.18.2",
"ws": "^8.14.2", "ws": "^8.14.2",
"y-websocket": "^1.5.0", "y-websocket": "^1.5.0",
"yjs": "^13.6.8", "yjs": "^13.6.8",
"zod": "^4.3.6" "cors": "^2.8.5",
"dotenv": "^16.3.1"
}, },
"devDependencies": { "devDependencies": {
"@types/cors": "^2.8.15",
"@types/express": "^4.17.20", "@types/express": "^4.17.20",
"@types/node": "^20.9.0",
"@types/ws": "^8.5.8", "@types/ws": "^8.5.8",
"@types/cors": "^2.8.15",
"@types/node": "^20.9.0",
"tsx": "^4.6.2", "tsx": "^4.6.2",
"typescript": "^5.2.2" "typescript": "^5.2.2"
} }

View File

@@ -9,10 +9,10 @@ import { logger } from './utils/logger';
dotenv.config(); dotenv.config();
const app = express(); const app = express();
const PORT = process.env.PORT || 5000; const PORT = process.env.PORT || 3000;
app.use(cors({ app.use(cors({
origin: process.env.CORS_ORIGIN || ['http://localhost:5173', 'http://localhost:5174', 'http://localhost:5175'], origin: process.env.CORS_ORIGIN || 'http://localhost:5173',
credentials: true credentials: true
})); }));

View File

@@ -1,178 +1,36 @@
import { WebSocketServer, WebSocket } from 'ws'; import { WebSocketServer, WebSocket } from 'ws';
import http from 'http'; import http from 'http';
import { SignalingMessage, SignalingMessageSchema } from './types/poll.types'; import { SignalingMessage } from './types/poll.types';
import { logger } from './utils/logger'; import { logger } from './utils/logger';
interface Client { interface Client {
id: string; id: string;
ws: WebSocket; ws: WebSocket;
roomId: string; roomId: string;
lastSeen: number;
messageCount: number;
lastMessageTime: number;
} }
export function createSignalingServer(server: http.Server) { export function createSignalingServer(server: http.Server) {
const wss = new WebSocketServer({ const wss = new WebSocketServer({
noServer: true server,
}); path: '/signal'
// Handle upgrade requests for /signal path
server.on('upgrade', (request, socket, head) => {
const pathname = request.url || '';
if (pathname === '/signal' || pathname.startsWith('/signal?')) {
logger.info(`[SIGNALING] Upgrade request for path: ${pathname}`);
wss.handleUpgrade(request, socket, head, (ws) => {
wss.emit('connection', ws, request);
});
}
}); });
const clients = new Map<string, Client>(); const clients = new Map<string, Client>();
const rooms = new Map<string, Set<string>>(); const rooms = new Map<string, Set<string>>();
const roomPasswords = new Map<string, string>();
const HEARTBEAT_INTERVAL = 30000;
const CLIENT_TIMEOUT = 60000;
const RATE_LIMIT_WINDOW = 1000;
const RATE_LIMIT_MAX = 10;
const heartbeatInterval = setInterval(() => {
const now = Date.now();
clients.forEach((client, clientId) => {
if (now - client.lastSeen > CLIENT_TIMEOUT) {
logger.info(`Client ${clientId} timed out, removing...`);
handleClientLeave(clientId);
} else {
client.ws.send(JSON.stringify({ type: 'ping' }));
}
});
}, HEARTBEAT_INTERVAL);
wss.on('connection', (ws: WebSocket) => { wss.on('connection', (ws: WebSocket) => {
let clientId: string | null = null; let clientId: string | null = null;
const tempClientId = `temp-${Date.now()}-${Math.random().toString(36).substr(2, 9)}`;
logger.info(`[SIGNALING] New WebSocket connection (temp: ${tempClientId})`);
ws.on('message', (data: Buffer) => { ws.on('message', (data: Buffer) => {
try { try {
const parsed = JSON.parse(data.toString()); const message: SignalingMessage = JSON.parse(data.toString());
const validationResult = SignalingMessageSchema.safeParse(parsed);
if (!validationResult.success) {
logger.error('Invalid signaling message:', validationResult.error);
ws.send(JSON.stringify({
type: 'error',
message: 'Invalid message format',
errors: validationResult.error.issues
}));
return;
}
const message: SignalingMessage = validationResult.data;
if (clientId) {
const client = clients.get(clientId);
if (client) {
const now = Date.now();
if (now - client.lastMessageTime < RATE_LIMIT_WINDOW) {
client.messageCount++;
if (client.messageCount > RATE_LIMIT_MAX) {
logger.warn(`Client ${clientId} exceeded rate limit`);
ws.send(JSON.stringify({
type: 'error',
message: 'Rate limit exceeded'
}));
return;
}
} else {
client.messageCount = 1;
client.lastMessageTime = now;
}
}
}
switch (message.type) { switch (message.type) {
case 'subscribe':
// y-webrtc subscribe message - client wants to join topics
if (message.topics) {
message.topics.forEach((topic: string) => {
if (!rooms.has(topic)) {
rooms.set(topic, new Set());
}
const tempId = `sub-${Date.now()}-${Math.random().toString(36).substr(2, 9)}`;
clientId = tempId;
rooms.get(topic)!.add(tempId);
clients.set(tempId, {
id: tempId,
ws,
roomId: topic,
lastSeen: Date.now(),
messageCount: 0,
lastMessageTime: Date.now()
});
logger.info(`[SIGNALING] Client subscribed to topic: ${topic}`);
});
}
break;
case 'unsubscribe':
// y-webrtc unsubscribe message
if (message.topics && clientId) {
message.topics.forEach((topic: string) => {
const room = rooms.get(topic);
if (room && clientId) {
room.delete(clientId);
if (room.size === 0) {
rooms.delete(topic);
}
}
});
}
break;
case 'publish':
// y-webrtc publish message - broadcast to all subscribers of a topic
if (message.topic) {
const topic = message.topic;
const room = rooms.get(topic);
if (room) {
room.forEach((subscriberId) => {
const subscriber = clients.get(subscriberId);
if (subscriber && subscriber.ws !== ws && subscriber.ws.readyState === WebSocket.OPEN) {
subscriber.ws.send(JSON.stringify({
type: 'publish',
topic: topic,
data: message.data
}));
}
});
}
}
break;
case 'join': case 'join':
clientId = message.from || `client-${Date.now()}`; clientId = message.from;
const roomId = message.roomId || 'default-room'; const roomId = message.roomId || 'default-room';
const roomPassword = roomPasswords.get(roomId); clients.set(clientId, { id: clientId, ws, roomId });
if (roomPassword && message.password !== roomPassword) {
logger.warn(`Client ${clientId} failed password authentication for room ${roomId}`);
ws.send(JSON.stringify({
type: 'error',
message: 'Invalid room password'
}));
return;
}
clients.set(clientId, {
id: clientId,
ws,
roomId,
lastSeen: Date.now(),
messageCount: 0,
lastMessageTime: Date.now()
});
if (!rooms.has(roomId)) { if (!rooms.has(roomId)) {
rooms.set(roomId, new Set()); rooms.set(roomId, new Set());
@@ -214,39 +72,7 @@ export function createSignalingServer(server: http.Server) {
break; break;
case 'leave': case 'leave':
if (message.from) { handleClientLeave(message.from);
handleClientLeave(message.from);
}
break;
case 'create-room':
if (message.roomId && message.password) {
roomPasswords.set(message.roomId, message.password);
logger.info(`Room ${message.roomId} created with password protection`);
ws.send(JSON.stringify({
type: 'room-created',
roomId: message.roomId
}));
}
break;
case 'ping':
if (clientId) {
const client = clients.get(clientId);
if (client) {
client.lastSeen = Date.now();
client.ws.send(JSON.stringify({ type: 'pong', from: 'server' }));
}
}
break;
case 'pong':
if (clientId) {
const client = clients.get(clientId);
if (client) {
client.lastSeen = Date.now();
}
}
break; break;
} }
} catch (error) { } catch (error) {
@@ -294,11 +120,6 @@ export function createSignalingServer(server: http.Server) {
} }
} }
wss.on('close', () => {
clearInterval(heartbeatInterval);
logger.info('Signaling server closed');
});
logger.info('Signaling server running at path /signal'); logger.info('Signaling server running at path /signal');
return wss; return wss;

View File

@@ -1,5 +1,3 @@
import { z } from 'zod';
export interface PollOption { export interface PollOption {
id: string; id: string;
text: string; text: string;
@@ -17,24 +15,10 @@ export interface Poll {
options: PollOption[]; options: PollOption[];
} }
export const SignalingMessageSchema = z.object({
type: z.enum(['offer', 'answer', 'ice-candidate', 'join', 'leave', 'ping', 'pong', 'create-room', 'subscribe', 'unsubscribe', 'publish', 'signal']),
from: z.string().optional(),
to: z.string().optional(),
data: z.any().optional(),
roomId: z.string().optional(),
password: z.string().optional(),
topics: z.array(z.string()).optional(),
topic: z.string().optional()
});
export interface SignalingMessage { export interface SignalingMessage {
type: 'offer' | 'answer' | 'ice-candidate' | 'join' | 'leave' | 'ping' | 'pong' | 'create-room' | 'subscribe' | 'unsubscribe' | 'publish' | 'signal'; type: 'offer' | 'answer' | 'ice-candidate' | 'join' | 'leave';
from?: string; from: string;
to?: string; to?: string;
data?: any; data?: any;
roomId?: string; roomId?: string;
password?: string;
topics?: string[];
topic?: string;
} }

View File

@@ -1,115 +1,16 @@
type LogLevel = 'info' | 'error' | 'warn' | 'debug'; export const logger = {
type LogContext = Record<string, any>; info: (message: string, ...args: any[]) => {
console.log(`[INFO] ${new Date().toISOString()} - ${message}`, ...args);
interface LogEntry { },
timestamp: string; error: (message: string, ...args: any[]) => {
level: LogLevel; console.error(`[ERROR] ${new Date().toISOString()} - ${message}`, ...args);
message: string; },
context?: LogContext; warn: (message: string, ...args: any[]) => {
} console.warn(`[WARN] ${new Date().toISOString()} - ${message}`, ...args);
},
class Logger { debug: (message: string, ...args: any[]) => {
private context: LogContext = {}; if (process.env.NODE_ENV === 'development') {
private timers: Map<string, number> = new Map(); console.debug(`[DEBUG] ${new Date().toISOString()} - ${message}`, ...args);
setContext(ctx: LogContext): void {
this.context = { ...this.context, ...ctx };
}
clearContext(): void {
this.context = {};
}
private formatLog(level: LogLevel, message: string, args: any[]): LogEntry {
const entry: LogEntry = {
timestamp: new Date().toISOString(),
level,
message,
context: Object.keys(this.context).length > 0 ? { ...this.context } : undefined
};
if (args.length > 0) {
if (entry.context) {
entry.context.args = args;
} else {
entry.context = { args };
}
}
return entry;
}
private log(level: LogLevel, message: string, ...args: any[]): void {
const entry = this.formatLog(level, message, args);
const logString = JSON.stringify(entry);
switch (level) {
case 'info':
console.log(logString);
break;
case 'error':
console.error(logString);
break;
case 'warn':
console.warn(logString);
break;
case 'debug':
if (process.env.NODE_ENV === 'development') {
console.debug(logString);
}
break;
} }
} }
};
info(message: string, ...args: any[]): void {
this.log('info', message, ...args);
}
error(message: string, ...args: any[]): void {
this.log('error', message, ...args);
}
warn(message: string, ...args: any[]): void {
this.log('warn', message, ...args);
}
debug(message: string, ...args: any[]): void {
this.log('debug', message, ...args);
}
startTimer(label: string): void {
this.timers.set(label, Date.now());
}
endTimer(label: string): number {
const startTime = this.timers.get(label);
if (!startTime) {
this.warn(`Timer '${label}' not found`);
return 0;
}
const duration = Date.now() - startTime;
this.timers.delete(label);
this.debug(`Timer '${label}': ${duration}ms`);
return duration;
}
time<T>(label: string, fn: () => T): T {
this.startTimer(label);
try {
return fn();
} finally {
this.endTimer(label);
}
}
async timeAsync<T>(label: string, fn: () => Promise<T>): Promise<T> {
this.startTimer(label);
try {
return await fn();
} finally {
this.endTimer(label);
}
}
}
export const logger = new Logger();

View File

@@ -1,44 +1,19 @@
import { WebSocketServer } from 'ws'; import { WebSocketServer } from 'ws';
// @ts-ignore
import { setupWSConnection } from 'y-websocket/bin/utils'; import { setupWSConnection } from 'y-websocket/bin/utils';
import http from 'http'; import http from 'http';
import { logger } from './utils/logger'; import { logger } from './utils/logger';
export function createYjsServer(server: http.Server, port: number) { export function createYjsServer(server: http.Server, port: number) {
const wss = new WebSocketServer({ const wss = new WebSocketServer({
noServer: true server,
}); path: '/yjs'
// Handle upgrade requests for /yjs/* paths
server.on('upgrade', (request, socket, head) => {
const pathname = request.url || '';
if (pathname.startsWith('/yjs')) {
logger.info(`[YJS] Upgrade request for path: ${pathname}`);
wss.handleUpgrade(request, socket, head, (ws) => {
wss.emit('connection', ws, request);
});
}
}); });
wss.on('connection', (ws, req) => { wss.on('connection', (ws, req) => {
const url = req.url || 'unknown'; const docName = req.url?.split('?')[1]?.split('=')[1] || 'default-poll';
const remoteAddress = req.socket.remoteAddress || 'unknown'; logger.info(`New Yjs connection for document: ${docName}`);
logger.info(`[YJS] New connection from ${remoteAddress}, URL: ${url}`);
// Log when connection closes setupWSConnection(ws, req, { docName });
ws.on('close', () => {
logger.info(`[YJS] Connection closed from ${remoteAddress}`);
});
ws.on('error', (error) => {
logger.error(`[YJS] Connection error from ${remoteAddress}:`, error);
});
// y-websocket automatically handles docName from the URL path
// The room name is passed as part of the URL: /yjs/room-name
// We don't need to manually extract it
setupWSConnection(ws, req, { gc: true });
}); });
wss.on('error', (error) => { wss.on('error', (error) => {