From cbed5a6c3c97d27a22852e6f9bc39cf1593a2b15 Mon Sep 17 00:00:00 2001 From: soffee Date: Wed, 9 Apr 2025 19:50:53 +0300 Subject: [PATCH] structurize project --- src/config.ts | 2 +- src/main.ts | 4 +- src/metrics.ts | 181 ---------------------------------- src/metrics/dialogs.ts | 81 +++++++++++++++ src/{ => metrics}/keywords.ts | 27 ++++- src/metrics/message.ts | 78 +++++++++++++++ src/metrics/metrics.ts | 37 +++++++ 7 files changed, 223 insertions(+), 187 deletions(-) delete mode 100644 src/metrics.ts create mode 100644 src/metrics/dialogs.ts rename src/{ => metrics}/keywords.ts (77%) create mode 100644 src/metrics/message.ts create mode 100644 src/metrics/metrics.ts diff --git a/src/config.ts b/src/config.ts index fa8363d..4e42c31 100644 --- a/src/config.ts +++ b/src/config.ts @@ -1,5 +1,5 @@ import type { OptionDefinition } from "command-line-args"; -import type { RawKeywordLike } from "./keywords.js"; +import type { RawKeywordLike } from "./metrics/keywords.js"; import { readFile } from "node:fs/promises"; import cmdline from "command-line-args"; import yaml from "js-yaml"; diff --git a/src/main.ts b/src/main.ts index 55f0853..f748535 100644 --- a/src/main.ts +++ b/src/main.ts @@ -5,8 +5,8 @@ import { collectDefaultMetrics, Registry } from "prom-client"; import { config, readKeywords } from "./config.js"; import * as env from "./env.js"; -import { rawToPatterns } from "./keywords.js"; -import * as metrics from "./metrics.js"; +import { rawToPatterns } from "./metrics/keywords.js"; +import * as metrics from "./metrics/metrics.js"; import MetricsServer from "./server.js"; const registry = new Registry(); diff --git a/src/metrics.ts b/src/metrics.ts deleted file mode 100644 index 96648a1..0000000 --- a/src/metrics.ts +++ /dev/null @@ -1,181 +0,0 @@ -import type { Dispatcher } from "@mtcute/dispatcher"; -import type { Dialog, TelegramClient } from "@mtcute/node"; -import type { Registry } from "prom-client"; - -import process from "node:process"; -import timers from "node:timers/promises"; - -import { PropagationAction } from "@mtcute/dispatcher"; -import { Counter, Gauge } from "prom-client"; - -import { config } from "./config.js"; -import { peersConfigBoolFilter, peersConfigFilter } from "./filters.js"; -import { KeywordsCounter } from "./keywords.js"; - -function collectNewMessageMetrics(dp: Dispatcher, registry: Registry) { - const messages = new Counter({ - name: "messenger_dialog_messages_count", - help: "Messages count since exporter startup", - labelNames: ["peerId"], - }); - - const media = new Counter({ - name: "messenger_dialog_media_sent_count", - help: "Medias sent since exporter startup", - labelNames: ["peerId"], - }); - - const stickers = new Counter({ - name: "messenger_dialog_stickers_sent_count", - help: "Stickers sent since exporter startup", - labelNames: ["peerId"], - }); - - const voice = new Counter({ - name: "messenger_dialog_voice_messages_count", - help: "Voice messages sent since exporter startup", - labelNames: ["peerId"], - }); - - dp.onNewMessage(peersConfigFilter(config), (msg) => { - if (msg.media) { - let counter; - switch (msg.media.type) { - case "photo": case "audio": case "document": { - counter = media; - break; - } - case "sticker": { - counter = stickers; - break; - } - case "voice": { - counter = voice; - break; - } - case "video": { - if (msg.media.isRound) { - counter = voice; - } else { - counter = media; - } - break; - } - } - if (counter) { - counter.inc({ - peerId: msg.chat.id, - }); - } - } - - messages.inc({ - peerId: msg.chat.id, - }); - - return PropagationAction.Continue; - }); - - registry.registerMetric(media); - registry.registerMetric(stickers); - registry.registerMetric(voice); - registry.registerMetric(messages); -} - -class DialogsHolder { - private lastUpdate = 0n; - private dialogs: Dialog[] = []; - private isUpdating = false; - private ttl: bigint; - - constructor(private tg: TelegramClient, ttl: number, private timeout: number, private pollInterval = 10) { - this.ttl = BigInt(ttl) * 1000000n; - } - - public async get() { - if (this.isUpdating) { - for (let i = 0; i < this.timeout && this.isUpdating; i += this.pollInterval) { - await timers.setTimeout(this.pollInterval); - } - if (this.isUpdating) { - throw new Error("Timed out fetching dialogs"); - } - } - if (process.hrtime.bigint() - this.lastUpdate > this.ttl) { - this.isUpdating = true; - this.dialogs = []; - for await (const d of this.tg.iterDialogs()) { - if (!peersConfigBoolFilter(config, d.peer.id)) { - continue; - } - this.dialogs.push(d); - } - this.lastUpdate = process.hrtime.bigint(); - this.isUpdating = false; - } - return this.dialogs; - } -} - -function collectDialogMetrics(tg: TelegramClient, registry: Registry) { - const dialogs = new DialogsHolder(tg, 1000, 5000); - - const info = new Gauge({ - name: "messenger_dialog_info", - help: "Dialog information exposed as labels", - labelNames: ["peerId", "peerType", "displayName"], - collect: async () => { - info.reset(); - for (const d of await dialogs.get()) { - info.set({ - peerId: d.peer.id, - peerType: d.peer.type, - displayName: d.peer.displayName, - }, 1); - } - }, - }); - - const unread = new Gauge({ - name: "messenger_dialog_unread_messages_count", - help: "Number of unread messages in dialogs", - labelNames: ["peerId"], - collect: async () => { - unread.reset(); - for (const d of await dialogs.get()) { - unread.set({ - peerId: d.peer.id, - }, d.unreadCount); - } - }, - }); - - registry.registerMetric(info); - registry.registerMetric(unread); -} - -function newWordsCounter(dp: Dispatcher) { - const counter = new Counter({ - name: "messenger_dialog_words_count", - help: "Number of words in messages since exporter startup", - labelNames: ["peerId", "word"], - }); - dp.onNewMessage(peersConfigFilter(config), async (msg) => { - const words = msg.text.toLowerCase().split(" "); - for (const w of words) { - counter.inc({ - peerId: msg.chat.id, - word: w, - }); - } - return PropagationAction.Continue; - }); - return counter; -} - -export { - collectDialogMetrics, - collectNewMessageMetrics, - KeywordsCounter, - newWordsCounter, -}; diff --git a/src/metrics/dialogs.ts b/src/metrics/dialogs.ts new file mode 100644 index 0000000..9596079 --- /dev/null +++ b/src/metrics/dialogs.ts @@ -0,0 +1,81 @@ +import type { Dialog, TelegramClient } from "@mtcute/node"; + +import type { Registry } from "prom-client"; +import process from "node:process"; + +import timers from "node:timers/promises"; +import { Gauge } from "prom-client"; +import { config } from "../config.js"; +import { peersConfigBoolFilter } from "../filters.js"; + +export function collectDialogMetrics(tg: TelegramClient, registry: Registry) { + const dialogs = new DialogsHolder(tg, 1000, 5000); + + const info = new Gauge({ + name: "messenger_dialog_info", + help: "Dialog information exposed as labels", + labelNames: ["peerId", "peerType", "displayName"], + collect: async () => { + info.reset(); + for (const d of await dialogs.get()) { + info.set({ + peerId: d.peer.id, + peerType: d.peer.type, + displayName: d.peer.displayName, + }, 1); + } + }, + }); + + const unread = new Gauge({ + name: "messenger_dialog_unread_messages_count", + help: "Number of unread messages in dialogs", + labelNames: ["peerId"], + collect: async () => { + unread.reset(); + for (const d of await dialogs.get()) { + unread.set({ + peerId: d.peer.id, + }, d.unreadCount); + } + }, + }); + + registry.registerMetric(info); + registry.registerMetric(unread); +} + +class DialogsHolder { + private lastUpdate = 0n; + private dialogs: Dialog[] = []; + private isUpdating = false; + private ttl: bigint; + + constructor(private tg: TelegramClient, ttl: number, private timeout: number, private pollInterval = 10) { + this.ttl = BigInt(ttl) * 1000000n; + } + + public async get() { + if (this.isUpdating) { + for (let i = 0; i < this.timeout && this.isUpdating; i += this.pollInterval) { + await timers.setTimeout(this.pollInterval); + } + if (this.isUpdating) { + throw new Error("Timed out fetching dialogs"); + } + } + if (process.hrtime.bigint() - this.lastUpdate > this.ttl) { + this.isUpdating = true; + this.dialogs = []; + for await (const d of this.tg.iterDialogs()) { + if (!peersConfigBoolFilter(config, d.peer.id)) { + continue; + } + this.dialogs.push(d); + } + this.lastUpdate = process.hrtime.bigint(); + this.isUpdating = false; + } + return this.dialogs; + } +} diff --git a/src/keywords.ts b/src/metrics/keywords.ts similarity index 77% rename from src/keywords.ts rename to src/metrics/keywords.ts index ff26ece..ea45354 100644 --- a/src/keywords.ts +++ b/src/metrics/keywords.ts @@ -1,9 +1,11 @@ import type { Dispatcher } from "@mtcute/dispatcher"; + import { PropagationAction } from "@mtcute/dispatcher"; import { Counter } from "prom-client"; -import { config } from "./config.js"; -import { peersConfigFilter } from "./filters.js"; -import { escapeRegex } from "./utils.js"; + +import { config } from "../config.js"; +import { peersConfigFilter } from "../filters.js"; +import { escapeRegex } from "../utils.js"; export interface RawKeywordPattern { name: string; @@ -86,3 +88,22 @@ export class KeywordsCounter extends Counter { this._keywords = keywords; } } + +export function newWordsCounter(dp: Dispatcher) { + const counter = new Counter({ + name: "messenger_dialog_words_count", + help: "Number of words in messages since exporter startup", + labelNames: ["peerId", "word"], + }); + dp.onNewMessage(peersConfigFilter(config), async (msg) => { + const words = msg.text.toLowerCase().split(" "); + for (const w of words) { + counter.inc({ + peerId: msg.chat.id, + word: w, + }); + } + return PropagationAction.Continue; + }); + return counter; +} diff --git a/src/metrics/message.ts b/src/metrics/message.ts new file mode 100644 index 0000000..c84ce37 --- /dev/null +++ b/src/metrics/message.ts @@ -0,0 +1,78 @@ +import type { Dispatcher } from "@mtcute/dispatcher"; +import type { Registry } from "prom-client"; + +import { PropagationAction } from "@mtcute/dispatcher"; +import { Counter } from "prom-client"; + +import { config } from "../config.js"; +import { peersConfigFilter } from "../filters.js"; + +export function collectNewMessageMetrics(dp: Dispatcher, registry: Registry) { + const messages = new Counter({ + name: "messenger_dialog_messages_count", + help: "Messages count since exporter startup", + labelNames: ["peerId"], + }); + + const media = new Counter({ + name: "messenger_dialog_media_sent_count", + help: "Medias sent since exporter startup", + labelNames: ["peerId"], + }); + + const stickers = new Counter({ + name: "messenger_dialog_stickers_sent_count", + help: "Stickers sent since exporter startup", + labelNames: ["peerId"], + }); + + const voice = new Counter({ + name: "messenger_dialog_voice_messages_count", + help: "Voice messages sent since exporter startup", + labelNames: ["peerId"], + }); + + dp.onNewMessage(peersConfigFilter(config), (msg) => { + if (msg.media) { + let counter; + switch (msg.media.type) { + case "photo": case "audio": case "document": { + counter = media; + break; + } + case "sticker": { + counter = stickers; + break; + } + case "voice": { + counter = voice; + break; + } + case "video": { + if (msg.media.isRound) { + counter = voice; + } else { + counter = media; + } + break; + } + } + if (counter) { + counter.inc({ + peerId: msg.chat.id, + }); + } + } + + messages.inc({ + peerId: msg.chat.id, + }); + + return PropagationAction.Continue; + }); + + registry.registerMetric(media); + registry.registerMetric(stickers); + registry.registerMetric(voice); + registry.registerMetric(messages); +} diff --git a/src/metrics/metrics.ts b/src/metrics/metrics.ts new file mode 100644 index 0000000..86db334 --- /dev/null +++ b/src/metrics/metrics.ts @@ -0,0 +1,37 @@ +import type { Dispatcher } from "@mtcute/dispatcher"; + +import { PropagationAction } from "@mtcute/dispatcher"; +import { Counter } from "prom-client"; + +import { config } from "../config.js"; +import { peersConfigFilter } from "../filters.js"; + +import { collectDialogMetrics } from "./dialogs.js"; +import { KeywordsCounter } from "./keywords.js"; +import { collectNewMessageMetrics } from "./message.js"; + +function newWordsCounter(dp: Dispatcher) { + const counter = new Counter({ + name: "messenger_dialog_words_count", + help: "Number of words in messages since exporter startup", + labelNames: ["peerId", "word"], + }); + dp.onNewMessage(peersConfigFilter(config), async (msg) => { + const words = msg.text.toLowerCase().split(" "); + for (const w of words) { + counter.inc({ + peerId: msg.chat.id, + word: w, + }); + } + return PropagationAction.Continue; + }); + return counter; +} + +export { + collectDialogMetrics, + collectNewMessageMetrics, + KeywordsCounter, + newWordsCounter, +};