From 80967643667e09385f0e6987408592723e504461 Mon Sep 17 00:00:00 2001 From: soffee Date: Wed, 9 Apr 2025 19:41:39 +0300 Subject: [PATCH] use caching dialogs holder to reduce telegram api calls --- src/main.ts | 3 +- src/metrics.ts | 79 +++++++++++++++++++++++++++++++++++--------------- 2 files changed, 56 insertions(+), 26 deletions(-) diff --git a/src/main.ts b/src/main.ts index 0221ace..55f0853 100644 --- a/src/main.ts +++ b/src/main.ts @@ -32,8 +32,7 @@ const user = await tg.start({ console.log("Logged in as", user.username); -registry.registerMetric(metrics.newStaticPeerInfoGauge(tg)); -registry.registerMetric(metrics.newUnreadCountGauge(tg)); +metrics.collectDialogMetrics(tg, registry); metrics.collectNewMessageMetrics(dp, registry); if (config.keywords) { diff --git a/src/metrics.ts b/src/metrics.ts index 84e4e2e..96648a1 100644 --- a/src/metrics.ts +++ b/src/metrics.ts @@ -1,6 +1,10 @@ import type { Dispatcher } from "@mtcute/dispatcher"; -import type { TelegramClient } from "@mtcute/node"; +import type { Dialog, TelegramClient } from "@mtcute/node"; import type { Registry } from "prom-client"; + +import process from "node:process"; +import timers from "node:timers/promises"; + import { PropagationAction } from "@mtcute/dispatcher"; import { Counter, Gauge } from "prom-client"; @@ -78,19 +82,52 @@ function collectNewMessageMetrics(dp: Dispatcher, registry: Registry) { registry.registerMetric(messages); } -function newStaticPeerInfoGauge(tg: TelegramClient) { - const gauge = new Gauge({ +class DialogsHolder { + private lastUpdate = 0n; + private dialogs: Dialog[] = []; + private isUpdating = false; + private ttl: bigint; + + constructor(private tg: TelegramClient, ttl: number, private timeout: number, private pollInterval = 10) { + this.ttl = BigInt(ttl) * 1000000n; + } + + public async get() { + if (this.isUpdating) { + for (let i = 0; i < this.timeout && this.isUpdating; i += this.pollInterval) { + await timers.setTimeout(this.pollInterval); + } + if (this.isUpdating) { + throw new Error("Timed out fetching dialogs"); + } + } + if (process.hrtime.bigint() - this.lastUpdate > this.ttl) { + this.isUpdating = true; + this.dialogs = []; + for await (const d of this.tg.iterDialogs()) { + if (!peersConfigBoolFilter(config, d.peer.id)) { + continue; + } + this.dialogs.push(d); + } + this.lastUpdate = process.hrtime.bigint(); + this.isUpdating = false; + } + return this.dialogs; + } +} + +function collectDialogMetrics(tg: TelegramClient, registry: Registry) { + const dialogs = new DialogsHolder(tg, 1000, 5000); + + const info = new Gauge({ name: "messenger_dialog_info", help: "Dialog information exposed as labels", labelNames: ["peerId", "peerType", "displayName"], collect: async () => { - gauge.reset(); - for await (const d of tg.iterDialogs()) { - if (!peersConfigBoolFilter(config, d.peer.id)) { - continue; - } - - gauge.set({ + info.reset(); + for (const d of await dialogs.get()) { + info.set({ peerId: d.peer.id, peerType: d.peer.type, displayName: d.peer.displayName, @@ -98,28 +135,23 @@ function newStaticPeerInfoGauge(tg: TelegramClient) { } }, }); - return gauge; -} -function newUnreadCountGauge(tg: TelegramClient) { - const gauge = new Gauge({ + const unread = new Gauge({ name: "messenger_dialog_unread_messages_count", help: "Number of unread messages in dialogs", labelNames: ["peerId"], collect: async () => { - gauge.reset(); - for await (const d of tg.iterDialogs()) { - if (!peersConfigBoolFilter(config, d.peer.id)) { - continue; - } - - gauge.set({ + unread.reset(); + for (const d of await dialogs.get()) { + unread.set({ peerId: d.peer.id, }, d.unreadCount); } }, }); - return gauge; + + registry.registerMetric(info); + registry.registerMetric(unread); } function newWordsCounter(dp: Dispatcher) { @@ -142,9 +174,8 @@ function newWordsCounter(dp: Dispatcher) { } export { + collectDialogMetrics, collectNewMessageMetrics, KeywordsCounter, - newStaticPeerInfoGauge, - newUnreadCountGauge, newWordsCounter, };