import fs from 'fs'; import express from 'express'; import expressws from 'express-ws'; import WebSocket from 'ws'; import msgpack from 'msgpack-lite'; import { createCanvas, Image } from 'canvas'; import { PixBlOpcode, PixBlEncoding } from './enums'; interface PixBlClient { id: number, ws: WebSocket, encoding: PixBlEncoding, ping: any, subscribed: boolean } console.debug('pixelblaster'); var stats = { sent: 0, broadcast: 0, recv: 0, vrecv: 0, reqs: 0, pixels: 0, commits: 0, lastCommit: new Date(0) }; const script = fs.readFileSync('./frontend/dist/main.js').toString(); const app = expressws(express()).app; const canvas = createCanvas(800, 600); const ctx = canvas.getContext('2d'); if (fs.existsSync('stats.json')) { console.debug('loading stats'); stats = JSON.parse(fs.readFileSync('stats.json').toString()); } if (fs.existsSync('buffer.png')) { console.debug('loading previous framebuffer'); const img = new Image; img.src = fs.readFileSync('buffer.png'); ctx.drawImage(img, 0, 0); } else { console.debug('no framebuffer found'); ctx.fillStyle = `#FFFFFF`; ctx.fillRect(0, 0, 800, 600); } var bufferChangedSinceSave = false; setInterval(() => { stats.lastCommit = new Date(); console.debug('dumping stats to disk'); fs.writeFile('stats.json', JSON.stringify(stats), () => { }); if (!bufferChangedSinceSave) { console.debug('framebuffer has not been altered'); return; } stats.commits++; console.debug('dumping framebuffer to disk'); bufferChangedSinceSave = false; fs.writeFile('buffer.png', canvas.toBuffer(), () => { }); }, 60 * 1000); var accu = 0; const clients: PixBlClient[] = []; const encoders = [ function (data) { return JSON.stringify(data); }, function (data) { return msgpack.encode(data); } ]; function broadcast(data) { stats.broadcast++; clients.forEach(x => { try { if ((x.ws.readyState === WebSocket.OPEN) && x.subscribed) { stats.sent++; x.ws.send(encoders[x.encoding](data)); } } catch (e) { } }); } var queue: number[][] = []; setInterval(() => { if (queue.length > 0) { broadcast([PixBlOpcode.pixel, queue.splice(0, 4096)]); } }, 65); app.use((req, res, next) => { stats.reqs++; console.log(`${req.method} ${req.path}`); next(); }); app.ws("/socket", (ws, req) => { const state: PixBlClient = { id: accu++, ws, encoding: PixBlEncoding.msgpack, ping: setInterval(() => send([PixBlOpcode.ping]), 5000), subscribed: false }; function send(data) { try { if (state.ws.readyState === WebSocket.OPEN) { stats.sent++; state.ws.send(encoders[state.encoding](data)); } } catch (e) { } } clients.push(state); console.log(`${state.id}: open from ${req.ip}`); send([PixBlOpcode.hello, state.id]); ws.on('close', () => { clearInterval(state.ping); clients.splice(clients.indexOf(state), 1); console.log(`${state.id}: closed`); }); ws.on('message', msg => { stats.recv++; try { const data = msg instanceof Buffer ? msgpack.decode(msg) : ((typeof msg === 'string') ? JSON.parse(msg) : null); if (data == null || !Array.isArray(data) || typeof data[0] !== 'number') { console.warn(`received invalid data from ${state.id}`); return; } switch (data[0]) { case PixBlOpcode.ping: stats.vrecv++; break; case PixBlOpcode.pixel: bufferChangedSinceSave = true; const args = { x: data[1], y: data[2], r: data[3], g: data[4], b: data[5] }; for (var k in args) if (typeof args[k] !== 'number') return; if (args.x > 800 || args.x < 0 || args.y > 600 || args.y < 0) return; ctx.fillStyle = `rgb(${args.r},${args.g},${args.b})`; ctx.fillRect(args.x, args.y, 1, 1); //broadcast([PixBlOpcode.pixel, args.x, args.y, args.r, args.g, args.b]); queue.push([args.x, args.y, args.r, args.g, args.b]); stats.pixels++; stats.vrecv++; break; case PixBlOpcode.sync: send([PixBlOpcode.sync, canvas.toBuffer()]); stats.vrecv++; break; case PixBlOpcode.select: if (typeof data[1] !== 'number' || data[1] < PixBlEncoding.json || data[1] > PixBlEncoding.msgpack) { console.warn(`${state.id} attempted to select invalid encoding`); return; } state.encoding = data[1]; send([PixBlOpcode.select, state.encoding]); stats.vrecv++; break; case PixBlOpcode.subscribe: state.subscribed = true; send([PixBlOpcode.subscribe]); console.debug(`${state.id} has subscribed to receive image updates`); stats.vrecv++; break; default: console.warn(`invalid opcode from ${state.id}`); } } catch (e) { console.error(`${state.id}: exception while processing message`); console.error(e); } }); }); app.get('/', (req, res) => { res.send(`\npixelblaster`) res.end(); }); app.get('/xyzzy', (req, res) => { res.type('text'); res.send('nothing happened'); res.end(); }) app.get('/stats', (req, res) => { res.type('json'); res.send(JSON.stringify({ clients: { online: clients.length, total: accu }, requests: stats.reqs, messages: { sent: stats.sent, broadcasts: stats.broadcast, receivedTotal: stats.recv, receivedValid: stats.vrecv }, storage: { commits: stats.commits, lastCommit: stats.lastCommit }, pixels: stats.pixels }, null, 2)); res.end(); }); app.get('/frame', (req, res) => { res.type('png'); res.send(canvas.toBuffer()); res.end(); }); app.listen('8239');