structurize project

This commit is contained in:
soffee 2025-04-09 19:50:53 +03:00
parent 8096764366
commit cbed5a6c3c
7 changed files with 223 additions and 187 deletions

View file

@ -1,5 +1,5 @@
import type { OptionDefinition } from "command-line-args"; import type { OptionDefinition } from "command-line-args";
import type { RawKeywordLike } from "./keywords.js"; import type { RawKeywordLike } from "./metrics/keywords.js";
import { readFile } from "node:fs/promises"; import { readFile } from "node:fs/promises";
import cmdline from "command-line-args"; import cmdline from "command-line-args";
import yaml from "js-yaml"; import yaml from "js-yaml";

View file

@ -5,8 +5,8 @@ import { collectDefaultMetrics, Registry } from "prom-client";
import { config, readKeywords } from "./config.js"; import { config, readKeywords } from "./config.js";
import * as env from "./env.js"; import * as env from "./env.js";
import { rawToPatterns } from "./keywords.js"; import { rawToPatterns } from "./metrics/keywords.js";
import * as metrics from "./metrics.js"; import * as metrics from "./metrics/metrics.js";
import MetricsServer from "./server.js"; import MetricsServer from "./server.js";
const registry = new Registry(); const registry = new Registry();

View file

@ -1,181 +0,0 @@
import type { Dispatcher } from "@mtcute/dispatcher";
import type { Dialog, TelegramClient } from "@mtcute/node";
import type { Registry } from "prom-client";
import process from "node:process";
import timers from "node:timers/promises";
import { PropagationAction } from "@mtcute/dispatcher";
import { Counter, Gauge } from "prom-client";
import { config } from "./config.js";
import { peersConfigBoolFilter, peersConfigFilter } from "./filters.js";
import { KeywordsCounter } from "./keywords.js";
function collectNewMessageMetrics(dp: Dispatcher, registry: Registry) {
const messages = new Counter({
name: "messenger_dialog_messages_count",
help: "Messages count since exporter startup",
labelNames: ["peerId"],
});
const media = new Counter({
name: "messenger_dialog_media_sent_count",
help: "Medias sent since exporter startup",
labelNames: ["peerId"],
});
const stickers = new Counter({
name: "messenger_dialog_stickers_sent_count",
help: "Stickers sent since exporter startup",
labelNames: ["peerId"],
});
const voice = new Counter({
name: "messenger_dialog_voice_messages_count",
help: "Voice messages sent since exporter startup",
labelNames: ["peerId"],
});
dp.onNewMessage(peersConfigFilter(config), (msg) => {
if (msg.media) {
let counter;
switch (msg.media.type) {
case "photo": case "audio": case "document": {
counter = media;
break;
}
case "sticker": {
counter = stickers;
break;
}
case "voice": {
counter = voice;
break;
}
case "video": {
if (msg.media.isRound) {
counter = voice;
} else {
counter = media;
}
break;
}
}
if (counter) {
counter.inc({
peerId: msg.chat.id,
});
}
}
messages.inc({
peerId: msg.chat.id,
});
return PropagationAction.Continue;
});
registry.registerMetric(media);
registry.registerMetric(stickers);
registry.registerMetric(voice);
registry.registerMetric(messages);
}
class DialogsHolder {
private lastUpdate = 0n;
private dialogs: Dialog[] = [];
private isUpdating = false;
private ttl: bigint;
constructor(private tg: TelegramClient, ttl: number, private timeout: number, private pollInterval = 10) {
this.ttl = BigInt(ttl) * 1000000n;
}
public async get() {
if (this.isUpdating) {
for (let i = 0; i < this.timeout && this.isUpdating; i += this.pollInterval) {
await timers.setTimeout(this.pollInterval);
}
if (this.isUpdating) {
throw new Error("Timed out fetching dialogs");
}
}
if (process.hrtime.bigint() - this.lastUpdate > this.ttl) {
this.isUpdating = true;
this.dialogs = [];
for await (const d of this.tg.iterDialogs()) {
if (!peersConfigBoolFilter(config, d.peer.id)) {
continue;
}
this.dialogs.push(d);
}
this.lastUpdate = process.hrtime.bigint();
this.isUpdating = false;
}
return this.dialogs;
}
}
function collectDialogMetrics(tg: TelegramClient, registry: Registry) {
const dialogs = new DialogsHolder(tg, 1000, 5000);
const info = new Gauge({
name: "messenger_dialog_info",
help: "Dialog information exposed as labels",
labelNames: ["peerId", "peerType", "displayName"],
collect: async () => {
info.reset();
for (const d of await dialogs.get()) {
info.set({
peerId: d.peer.id,
peerType: d.peer.type,
displayName: d.peer.displayName,
}, 1);
}
},
});
const unread = new Gauge({
name: "messenger_dialog_unread_messages_count",
help: "Number of unread messages in dialogs",
labelNames: ["peerId"],
collect: async () => {
unread.reset();
for (const d of await dialogs.get()) {
unread.set({
peerId: d.peer.id,
}, d.unreadCount);
}
},
});
registry.registerMetric(info);
registry.registerMetric(unread);
}
function newWordsCounter(dp: Dispatcher) {
const counter = new Counter({
name: "messenger_dialog_words_count",
help: "Number of words in messages since exporter startup",
labelNames: ["peerId", "word"],
});
dp.onNewMessage(peersConfigFilter(config), async (msg) => {
const words = msg.text.toLowerCase().split(" ");
for (const w of words) {
counter.inc({
peerId: msg.chat.id,
word: w,
});
}
return PropagationAction.Continue;
});
return counter;
}
export {
collectDialogMetrics,
collectNewMessageMetrics,
KeywordsCounter,
newWordsCounter,
};

81
src/metrics/dialogs.ts Normal file
View file

@ -0,0 +1,81 @@
import type { Dialog, TelegramClient } from "@mtcute/node";
import type { Registry } from "prom-client";
import process from "node:process";
import timers from "node:timers/promises";
import { Gauge } from "prom-client";
import { config } from "../config.js";
import { peersConfigBoolFilter } from "../filters.js";
export function collectDialogMetrics(tg: TelegramClient, registry: Registry) {
const dialogs = new DialogsHolder(tg, 1000, 5000);
const info = new Gauge({
name: "messenger_dialog_info",
help: "Dialog information exposed as labels",
labelNames: ["peerId", "peerType", "displayName"],
collect: async () => {
info.reset();
for (const d of await dialogs.get()) {
info.set({
peerId: d.peer.id,
peerType: d.peer.type,
displayName: d.peer.displayName,
}, 1);
}
},
});
const unread = new Gauge({
name: "messenger_dialog_unread_messages_count",
help: "Number of unread messages in dialogs",
labelNames: ["peerId"],
collect: async () => {
unread.reset();
for (const d of await dialogs.get()) {
unread.set({
peerId: d.peer.id,
}, d.unreadCount);
}
},
});
registry.registerMetric(info);
registry.registerMetric(unread);
}
class DialogsHolder {
private lastUpdate = 0n;
private dialogs: Dialog[] = [];
private isUpdating = false;
private ttl: bigint;
constructor(private tg: TelegramClient, ttl: number, private timeout: number, private pollInterval = 10) {
this.ttl = BigInt(ttl) * 1000000n;
}
public async get() {
if (this.isUpdating) {
for (let i = 0; i < this.timeout && this.isUpdating; i += this.pollInterval) {
await timers.setTimeout(this.pollInterval);
}
if (this.isUpdating) {
throw new Error("Timed out fetching dialogs");
}
}
if (process.hrtime.bigint() - this.lastUpdate > this.ttl) {
this.isUpdating = true;
this.dialogs = [];
for await (const d of this.tg.iterDialogs()) {
if (!peersConfigBoolFilter(config, d.peer.id)) {
continue;
}
this.dialogs.push(d);
}
this.lastUpdate = process.hrtime.bigint();
this.isUpdating = false;
}
return this.dialogs;
}
}

View file

@ -1,9 +1,11 @@
import type { Dispatcher } from "@mtcute/dispatcher"; import type { Dispatcher } from "@mtcute/dispatcher";
import { PropagationAction } from "@mtcute/dispatcher"; import { PropagationAction } from "@mtcute/dispatcher";
import { Counter } from "prom-client"; import { Counter } from "prom-client";
import { config } from "./config.js";
import { peersConfigFilter } from "./filters.js"; import { config } from "../config.js";
import { escapeRegex } from "./utils.js"; import { peersConfigFilter } from "../filters.js";
import { escapeRegex } from "../utils.js";
export interface RawKeywordPattern { export interface RawKeywordPattern {
name: string; name: string;
@ -86,3 +88,22 @@ export class KeywordsCounter extends Counter {
this._keywords = keywords; this._keywords = keywords;
} }
} }
export function newWordsCounter(dp: Dispatcher) {
const counter = new Counter({
name: "messenger_dialog_words_count",
help: "Number of words in messages since exporter startup",
labelNames: ["peerId", "word"],
});
dp.onNewMessage(peersConfigFilter(config), async (msg) => {
const words = msg.text.toLowerCase().split(" ");
for (const w of words) {
counter.inc({
peerId: msg.chat.id,
word: w,
});
}
return PropagationAction.Continue;
});
return counter;
}

78
src/metrics/message.ts Normal file
View file

@ -0,0 +1,78 @@
import type { Dispatcher } from "@mtcute/dispatcher";
import type { Registry } from "prom-client";
import { PropagationAction } from "@mtcute/dispatcher";
import { Counter } from "prom-client";
import { config } from "../config.js";
import { peersConfigFilter } from "../filters.js";
export function collectNewMessageMetrics(dp: Dispatcher, registry: Registry) {
const messages = new Counter({
name: "messenger_dialog_messages_count",
help: "Messages count since exporter startup",
labelNames: ["peerId"],
});
const media = new Counter({
name: "messenger_dialog_media_sent_count",
help: "Medias sent since exporter startup",
labelNames: ["peerId"],
});
const stickers = new Counter({
name: "messenger_dialog_stickers_sent_count",
help: "Stickers sent since exporter startup",
labelNames: ["peerId"],
});
const voice = new Counter({
name: "messenger_dialog_voice_messages_count",
help: "Voice messages sent since exporter startup",
labelNames: ["peerId"],
});
dp.onNewMessage(peersConfigFilter(config), (msg) => {
if (msg.media) {
let counter;
switch (msg.media.type) {
case "photo": case "audio": case "document": {
counter = media;
break;
}
case "sticker": {
counter = stickers;
break;
}
case "voice": {
counter = voice;
break;
}
case "video": {
if (msg.media.isRound) {
counter = voice;
} else {
counter = media;
}
break;
}
}
if (counter) {
counter.inc({
peerId: msg.chat.id,
});
}
}
messages.inc({
peerId: msg.chat.id,
});
return PropagationAction.Continue;
});
registry.registerMetric(media);
registry.registerMetric(stickers);
registry.registerMetric(voice);
registry.registerMetric(messages);
}

37
src/metrics/metrics.ts Normal file
View file

@ -0,0 +1,37 @@
import type { Dispatcher } from "@mtcute/dispatcher";
import { PropagationAction } from "@mtcute/dispatcher";
import { Counter } from "prom-client";
import { config } from "../config.js";
import { peersConfigFilter } from "../filters.js";
import { collectDialogMetrics } from "./dialogs.js";
import { KeywordsCounter } from "./keywords.js";
import { collectNewMessageMetrics } from "./message.js";
function newWordsCounter(dp: Dispatcher) {
const counter = new Counter({
name: "messenger_dialog_words_count",
help: "Number of words in messages since exporter startup",
labelNames: ["peerId", "word"],
});
dp.onNewMessage(peersConfigFilter(config), async (msg) => {
const words = msg.text.toLowerCase().split(" ");
for (const w of words) {
counter.inc({
peerId: msg.chat.id,
word: w,
});
}
return PropagationAction.Continue;
});
return counter;
}
export {
collectDialogMetrics,
collectNewMessageMetrics,
KeywordsCounter,
newWordsCounter,
};