working on websocket

This commit is contained in:
2025-04-26 17:08:53 +03:00
parent ea308b2f1a
commit e1cc82f7bd
10 changed files with 99 additions and 74 deletions

View File

@ -23,11 +23,16 @@ import {
} from "@src/lib/errors.ts"; } from "@src/lib/errors.ts";
import devices from "@src/lib/devices.ts"; import devices from "@src/lib/devices.ts";
import { WebSocketClientsGroup } from "@src/lib/websocket.ts"; import { WebSocketClientsGroup } from "@src/lib/websocket.ts";
import { Option } from "@shared/utils/option.ts";
const AUTH_COOKIE_NAME = "token"; const AUTH_COOKIE_NAME = "token";
const VERSION = "0.1.0-a.1"; const VERSION = "0.1.0-a.1";
const router = new HttpRouter(); export type Variables = {
token: string;
};
const router = new HttpRouter<Variables>();
const views = Deno.cwd() + "/views/"; const views = Deno.cwd() + "/views/";
export const eta = new Eta({ views }); export const eta = new Eta({ views });
@ -41,15 +46,15 @@ const cache: Map<string, Response> = new Map();
router.get("/public/*", async (c) => { router.get("/public/*", async (c) => {
const filePath = "." + c.path; const filePath = "." + c.path;
//const cached = cache.get(filePath); const cached = cache.get(filePath);
//
//if (cached) { // if (cached) {
// return cached.clone(); // return cached.clone();
//} // }
const res = await serveFile(c.req, filePath); const res = await serveFile(c.req, filePath);
//cache.set(filePath, res.clone()); // cache.set(filePath, res.clone());
return res; return res;
}); });
@ -93,30 +98,21 @@ router
}); });
const group = new WebSocketClientsGroup(); const group = new WebSocketClientsGroup();
group.onmessage = (e) => {
group.sendToAll("pong");
console.log("ping");
};
router.get("/api/admin/ws", (c) => { router.get("/api/admin/ws", (c) => {
if (c.req.headers.get("upgrade") != "websocket") { if (c.req.headers.get("upgrade") != "websocket") {
return new Response(null, { status: 501 }); return new Response(null, { status: 501 });
} }
const { socket, response } = Deno.upgradeWebSocket(c.req); const token = c.var.get("token");
group.addClient(socket); let { socket, response } = Deno.upgradeWebSocket(c.req);
socket.addEventListener("open", () => { socket = group.addClient(token, socket).unwrap();
console.log("a client connected!");
});
socket.addEventListener("close", () => {
console.log("client disconnected");
});
socket.addEventListener("message", (event) => {
if (event.data === "ping") {
console.log("ping");
socket.send("pong");
}
});
return response; return response;
}); });

File diff suppressed because one or more lines are too long

View File

@ -50,7 +50,10 @@ export class Context<
S extends string = string, S extends string = string,
ReqSchema extends Schema<any> = Schema<unknown>, ReqSchema extends Schema<any> = Schema<unknown>,
ResSchema extends Schema<any> = Schema<unknown>, ResSchema extends Schema<any> = Schema<unknown>,
Vars extends Record<string | number, any> = Record<string | number, any>, Variables extends Record<string | number, any> = Record<
string | number,
any
>,
> { > {
private _url?: URL; private _url?: URL;
private _hostname?: string; private _hostname?: string;
@ -80,6 +83,7 @@ export class Context<
ctx._cookies = this._cookies; ctx._cookies = this._cookies;
ctx.res = this.res; ctx.res = this.res;
ctx.schema = schema; ctx.schema = schema;
ctx._var = this._var;
return ctx as Context<S, Req, Res> & { schema: { req: Req; res: Res } }; return ctx as Context<S, Req, Res> & { schema: { req: Req; res: Res } };
} }
@ -97,6 +101,7 @@ export class Context<
ctx._cookies = this._cookies; ctx._cookies = this._cookies;
ctx.res = this.res; ctx.res = this.res;
ctx.schema = this.schema; ctx.schema = this.schema;
ctx._var = this._var;
return ctx as Context<S, ReqSchema, ResSchema>; return ctx as Context<S, ReqSchema, ResSchema>;
} }
@ -260,14 +265,14 @@ export class Context<
}; };
} }
private _var: Vars = {} as Vars; private _var: Variables = {} as Variables;
public get var() { public get var() {
return { return {
set: (key: keyof Vars, value: Vars[number]) => { set: <K extends keyof Variables>(key: K, value: Variables[K]) => {
this._var[key] = value; this._var[key] = value;
}, },
get: <K extends keyof Vars>(key: K): Vars[K] => { get: <K extends keyof Variables>(key: K): Variables[K] => {
return this._var[key]; return this._var[key];
}, },
}; };

View File

@ -5,14 +5,24 @@ import { Api } from "@src/lib/apiValidator.ts";
import { notAllowedError, notFoundError } from "@src/lib/errors.ts"; import { notAllowedError, notFoundError } from "@src/lib/errors.ts";
import { err } from "@shared/utils/result.ts"; import { err } from "@shared/utils/result.ts";
type VariablesType = Record<string | number, any>;
type RequestHandler< type RequestHandler<
S extends string = string, S extends string = string,
ReqSchema extends Schema<unknown> = Schema<unknown>, ReqSchema extends Schema<unknown> = Schema<unknown>,
ResSchema extends Schema<unknown> = Schema<unknown>, ResSchema extends Schema<unknown> = Schema<unknown>,
> = (c: Context<S, ReqSchema, ResSchema>) => Promise<Response> | Response; Variables extends VariablesType = Record<
string | number,
any
>,
> = (
c: Context<S, ReqSchema, ResSchema, Variables>,
) => Promise<Response> | Response;
export type Middleware = ( export type Middleware<
c: Context<string>, Variables extends VariablesType = Partial<Record<string | number, any>>,
> = (
c: Context<string, any, any, Variables>,
next: () => Promise<void>, next: () => Promise<void>,
) => Promise<Response | void> | Response | void; ) => Promise<Response | void> | Response | void;
@ -35,14 +45,16 @@ const DEFAULT_NOT_ALLOWED_HANDLER =
})) as RequestHandler; })) as RequestHandler;
class HttpRouter< class HttpRouter<
Variables extends Record<string | number, any> = Record< Variables extends VariablesType = Partial<
Record<
string | number, string | number,
any any
>
>, >,
> { > {
public readonly routerTree = new RouterTree<MethodHandlers>(); public readonly routerTree = new RouterTree<MethodHandlers>();
public pathTransformer?: (path: string) => string; public pathTransformer?: (path: string) => string;
private middlewares: Middleware[] = []; private middlewares: Middleware<Variables>[] = [];
public notFoundHandler: RequestHandler = DEFAULT_NOT_FOUND_HANDLER; public notFoundHandler: RequestHandler = DEFAULT_NOT_FOUND_HANDLER;
public methodNotAllowedHandler: RequestHandler = public methodNotAllowedHandler: RequestHandler =
DEFAULT_NOT_ALLOWED_HANDLER; DEFAULT_NOT_ALLOWED_HANDLER;
@ -52,7 +64,7 @@ class HttpRouter<
return this; return this;
} }
public use(middleware: Middleware): this { public use(middleware: Middleware<Variables>): this {
this.middlewares.push(middleware); this.middlewares.push(middleware);
return this; return this;
} }
@ -64,9 +76,9 @@ class HttpRouter<
>( >(
path: S, path: S,
method: string, method: string,
handler: RequestHandler<S, ReqSchema, ResSchema>, handler: RequestHandler<S, ReqSchema, ResSchema, Variables>,
schema?: { req: ReqSchema; res: ResSchema }, schema?: { req: ReqSchema; res: ResSchema },
): HttpRouter; ): this;
public add< public add<
S extends string, S extends string,
ReqSchema extends Schema<unknown> = Schema<unknown>, ReqSchema extends Schema<unknown> = Schema<unknown>,
@ -74,15 +86,15 @@ class HttpRouter<
>( >(
path: S[], path: S[],
method: string, method: string,
handler: RequestHandler<string, ReqSchema, ResSchema>, handler: RequestHandler<string, ReqSchema, ResSchema, Variables>,
schema?: { req: ReqSchema; res: ResSchema }, schema?: { req: ReqSchema; res: ResSchema },
): HttpRouter; ): this;
public add( public add(
path: string | string[], path: string | string[],
method: string, method: string,
handler: RequestHandler<string>, handler: RequestHandler<string>,
schema?: { req: Schema<unknown>; res: Schema<unknown> }, schema?: { req: Schema<unknown>; res: Schema<unknown> },
): HttpRouter { ): this {
const paths = Array.isArray(path) ? path : [path]; const paths = Array.isArray(path) ? path : [path];
for (const p of paths) { for (const p of paths) {
@ -103,16 +115,16 @@ class HttpRouter<
public get<S extends string>( public get<S extends string>(
path: S, path: S,
handler: RequestHandler<S>, handler: RequestHandler<S, any, any, Variables>,
): HttpRouter; ): this;
public get<S extends string>( public get<S extends string>(
path: S[], path: S[],
handler: RequestHandler, handler: RequestHandler<S, any, any, Variables>,
): HttpRouter; ): this;
public get( public get(
path: string | string[], path: string | string[],
handler: RequestHandler, handler: RequestHandler<string, any, any, Variables>,
): HttpRouter { ): this {
if (Array.isArray(path)) { if (Array.isArray(path)) {
return this.add(path, "GET", handler); return this.add(path, "GET", handler);
} }
@ -121,15 +133,15 @@ class HttpRouter<
public post<S extends string>( public post<S extends string>(
path: S, path: S,
handler: RequestHandler<S>, handler: RequestHandler<S, any, any, Variables>,
): HttpRouter; ): HttpRouter;
public post( public post(
path: string[], path: string[],
handler: RequestHandler, handler: RequestHandler<string, any, any, Variables>,
): HttpRouter; ): HttpRouter;
public post( public post(
path: string | string[], path: string | string[],
handler: RequestHandler<string>, handler: RequestHandler<string, any, any, Variables>,
): HttpRouter { ): HttpRouter {
if (Array.isArray(path)) { if (Array.isArray(path)) {
return this.add(path, "POST", handler); return this.add(path, "POST", handler);
@ -143,7 +155,7 @@ class HttpRouter<
ResSchema extends Schema<unknown>, ResSchema extends Schema<unknown>,
>( >(
api: Api<Path, ReqSchema, ResSchema>, api: Api<Path, ReqSchema, ResSchema>,
handler: RequestHandler<Path, ReqSchema, ResSchema>, handler: RequestHandler<Path, ReqSchema, ResSchema, Variables>,
): HttpRouter { ): HttpRouter {
return this.add(api.path, api.method, handler, api.schema); return this.add(api.path, api.method, handler, api.schema);
} }
@ -171,7 +183,7 @@ class HttpRouter<
): { ): {
handler: RequestHandler; handler: RequestHandler;
params: Record<string, string>; params: Record<string, string>;
ctx: Context<any, any, any, Variables>; ctx: Context<any>;
} { } {
const routeOption = this.routerTree.find(path); const routeOption = this.routerTree.find(path);

View File

@ -34,13 +34,14 @@ export class WebSocketClientsGroup<
public onopen?: EventListenerOrEventListenerObject, public onopen?: EventListenerOrEventListenerObject,
public onclose?: EventListenerOrEventListenerObject, public onclose?: EventListenerOrEventListenerObject,
public onerror?: EventListenerOrEventListenerObject, public onerror?: EventListenerOrEventListenerObject,
public onmessage?: EventListenerOrEventListenerObject, public onmessage?: (e: MessageEvent) => any,
) {} ) {}
public addClient( public addClient(
token: string, token: string,
socket: WebSocket, socket: WebSocket,
): Result<void, TooManyConnectionError> { lifetime?: Date,
): Result<WebSocket, TooManyConnectionError> {
if (this.connectionsCounter > MAX_CONNECTIONS) { if (this.connectionsCounter > MAX_CONNECTIONS) {
return err(tooManyConnectionError("Too many connections")); return err(tooManyConnectionError("Too many connections"));
} }
@ -61,18 +62,28 @@ export class WebSocketClientsGroup<
clientConnections.set(uuid, socket); clientConnections.set(uuid, socket);
socket.addEventListener("close", () => { socket.addEventListener("close", () => {
clientConnections.delete(uuid); clientConnections.delete(uuid);
this.connectionsCounter--;
}); });
socket.addEventListener("error", () => { socket.addEventListener("error", () => {
clientConnections.delete(uuid); clientConnections.delete(uuid);
this.connectionsCounter--;
}); });
this.connectionsCounter++; this.connectionsCounter++;
socket.addEventListener("open", this.onopen!); if (this.onopen) {
socket.addEventListener("open", this.onclose!); socket.addEventListener("open", this.onopen);
socket.addEventListener("open", this.onerror!); }
socket.addEventListener("open", this.onmessage!); if (this.onclose) {
socket.addEventListener("close", this.onclose);
}
if (this.onerror) {
socket.addEventListener("error", this.onerror);
}
if (this.onmessage) {
socket.addEventListener("message", this.onmessage);
}
return ok(); return ok(socket);
} }
sendToAll( sendToAll(
@ -82,7 +93,7 @@ export class WebSocketClientsGroup<
.andThen((msg) => { .andThen((msg) => {
const errors = []; const errors = [];
for (const client of this.clients.values()) { for (const client of this.clients.values()) {
for (const connection of client) { for (const connection of client.values()) {
try { try {
connection.send(JSON.stringify(msg)); connection.send(JSON.stringify(msg));
} catch (e) { } catch (e) {

View File

@ -3,14 +3,14 @@ import { errAsync, okAsync, ResultAsync } from "@shared/utils/resultasync.ts";
import { InferSchemaType, Schema, z } from "@shared/utils/validator.ts"; import { InferSchemaType, Schema, z } from "@shared/utils/validator.ts";
const CONNECTION_TIMEOUT_MS = 2000; const CONNECTION_TIMEOUT_MS = 2000;
const PING_INTERVAL_MS = 1000; const PING_INTERVAL_MS = 5000;
const PING_CHECK_INTERVAL_MS = 15000; const PING_CHECK_INTERVAL_MS = 15000;
const MAX_PING_ATTEMPTS = 5; const MAX_PING_ATTEMPTS = 5;
const MAX_RECONNECTION_ATTEMPTS = 5; const MAX_RECONNECTION_ATTEMPTS = 5;
export class WebSocketWrapper< export class WebSocketWrapper<
R extends Schema<any> = Schema<unknown>, R extends Schema<unknown> = Schema<unknown>,
S extends Schema<any> = Schema<unknown>, S extends Schema<unknown> = Schema<unknown>,
> { > {
private _ws: Option<WebSocket> = none; private _ws: Option<WebSocket> = none;
get ws(): Option<WebSocket> { get ws(): Option<WebSocket> {
@ -58,7 +58,7 @@ export class WebSocketWrapper<
public onDisconnect?: () => void; public onDisconnect?: () => void;
public onMessage?: (ev: MessageEvent<any>) => void; public onMessage?: (ev: MessageEvent<unknown>) => void;
private isConnecting = false; private isConnecting = false;

View File

@ -1,16 +1,12 @@
import { Middleware } from "@lib/router.ts"; import { Middleware } from "@lib/router.ts";
import admin from "@lib/admin.ts"; import admin from "@lib/admin.ts";
import { import { queryExecutionError, unauthorizedError } from "@lib/errors.ts";
queryExecutionError,
tooManyRequestsError,
unauthorizedError,
} from "@src/lib/errors.ts";
import { err, ok } from "@shared/utils/result.ts"; import { err, ok } from "@shared/utils/result.ts";
import { eta } from "../../main.ts"; import { eta, Variables } from "../../main.ts";
const EXCLUDE = new Set(["/login", "/setup", "/version"]); const EXCLUDE = new Set(["/login", "/setup", "/version"]);
const authMiddleware: Middleware = async (c, next) => { const authMiddleware: Middleware<Variables> = async (c, next) => {
const token = c.cookies.get("token"); const token = c.cookies.get("token");
const isValid = token const isValid = token
.map((token) => admin.sessions.verifyToken(token)).match( .map((token) => admin.sessions.verifyToken(token)).match(
@ -52,6 +48,9 @@ const authMiddleware: Middleware = async (c, next) => {
return c.redirect("/login"); return c.redirect("/login");
} }
} }
c.var.set("token", token.unwrapOr(""));
await next(); await next();
}; };

View File

@ -1,6 +1,7 @@
import { Middleware } from "@lib/router.ts"; import { Middleware } from "@lib/router.ts";
import { Variables } from "../../main.ts";
const loggerMiddleware: Middleware = async (c, next) => { const loggerMiddleware: Middleware<Variables> = async (c, next) => {
console.log("", c.req.method, c.path); console.log("", c.req.method, c.path);
await next(); await next();
console.log("", c.res.status, "\n"); console.log("", c.res.status, "\n");

View File

@ -1,7 +1,8 @@
import { Middleware } from "@lib/router.ts"; import { Middleware } from "@lib/router.ts";
import log from "@shared/utils/logger.ts"; import log from "@shared/utils/logger.ts";
import { err } from "@shared/utils/result.ts"; import { err } from "@shared/utils/result.ts";
import { tooManyRequestsError } from "@src/lib/errors.ts"; import { tooManyRequestsError } from "@lib/errors.ts";
import { Variables } from "../../main.ts";
const requestCounts: Partial< const requestCounts: Partial<
Record<string, { count: number; lastReset: number }> Record<string, { count: number; lastReset: number }>
@ -10,7 +11,7 @@ const requestCounts: Partial<
const MAX_REQUESTS_PER_WINDOW = 300; const MAX_REQUESTS_PER_WINDOW = 300;
const RATE_LIMIT_WINDOW = 60000; const RATE_LIMIT_WINDOW = 60000;
const rateLimitMiddleware: Middleware = async (c, next) => { const rateLimitMiddleware: Middleware<Variables> = async (c, next) => {
const hostnameOpt = c.hostname; const hostnameOpt = c.hostname;
if (hostnameOpt.isSome()) { if (hostnameOpt.isSome()) {

Binary file not shown.