use caching dialogs holder to reduce telegram api calls

This commit is contained in:
soffee 2025-04-09 19:41:39 +03:00
parent ba6b39bd80
commit 8096764366
2 changed files with 56 additions and 26 deletions

View file

@ -32,8 +32,7 @@ const user = await tg.start({
console.log("Logged in as", user.username); console.log("Logged in as", user.username);
registry.registerMetric(metrics.newStaticPeerInfoGauge(tg)); metrics.collectDialogMetrics(tg, registry);
registry.registerMetric(metrics.newUnreadCountGauge(tg));
metrics.collectNewMessageMetrics(dp, registry); metrics.collectNewMessageMetrics(dp, registry);
if (config.keywords) { if (config.keywords) {

View file

@ -1,6 +1,10 @@
import type { Dispatcher } from "@mtcute/dispatcher"; import type { Dispatcher } from "@mtcute/dispatcher";
import type { TelegramClient } from "@mtcute/node"; import type { Dialog, TelegramClient } from "@mtcute/node";
import type { Registry } from "prom-client"; import type { Registry } from "prom-client";
import process from "node:process";
import timers from "node:timers/promises";
import { PropagationAction } from "@mtcute/dispatcher"; import { PropagationAction } from "@mtcute/dispatcher";
import { Counter, Gauge } from "prom-client"; import { Counter, Gauge } from "prom-client";
@ -78,19 +82,52 @@ function collectNewMessageMetrics(dp: Dispatcher, registry: Registry) {
registry.registerMetric(messages); registry.registerMetric(messages);
} }
function newStaticPeerInfoGauge(tg: TelegramClient) { class DialogsHolder {
const gauge = new Gauge({ private lastUpdate = 0n;
private dialogs: Dialog[] = [];
private isUpdating = false;
private ttl: bigint;
constructor(private tg: TelegramClient, ttl: number, private timeout: number, private pollInterval = 10) {
this.ttl = BigInt(ttl) * 1000000n;
}
public async get() {
if (this.isUpdating) {
for (let i = 0; i < this.timeout && this.isUpdating; i += this.pollInterval) {
await timers.setTimeout(this.pollInterval);
}
if (this.isUpdating) {
throw new Error("Timed out fetching dialogs");
}
}
if (process.hrtime.bigint() - this.lastUpdate > this.ttl) {
this.isUpdating = true;
this.dialogs = [];
for await (const d of this.tg.iterDialogs()) {
if (!peersConfigBoolFilter(config, d.peer.id)) {
continue;
}
this.dialogs.push(d);
}
this.lastUpdate = process.hrtime.bigint();
this.isUpdating = false;
}
return this.dialogs;
}
}
function collectDialogMetrics(tg: TelegramClient, registry: Registry) {
const dialogs = new DialogsHolder(tg, 1000, 5000);
const info = new Gauge({
name: "messenger_dialog_info", name: "messenger_dialog_info",
help: "Dialog information exposed as labels", help: "Dialog information exposed as labels",
labelNames: ["peerId", "peerType", "displayName"], labelNames: ["peerId", "peerType", "displayName"],
collect: async () => { collect: async () => {
gauge.reset(); info.reset();
for await (const d of tg.iterDialogs()) { for (const d of await dialogs.get()) {
if (!peersConfigBoolFilter(config, d.peer.id)) { info.set({
continue;
}
gauge.set({
peerId: d.peer.id, peerId: d.peer.id,
peerType: d.peer.type, peerType: d.peer.type,
displayName: d.peer.displayName, displayName: d.peer.displayName,
@ -98,28 +135,23 @@ function newStaticPeerInfoGauge(tg: TelegramClient) {
} }
}, },
}); });
return gauge;
}
function newUnreadCountGauge(tg: TelegramClient) { const unread = new Gauge({
const gauge = new Gauge({
name: "messenger_dialog_unread_messages_count", name: "messenger_dialog_unread_messages_count",
help: "Number of unread messages in dialogs", help: "Number of unread messages in dialogs",
labelNames: ["peerId"], labelNames: ["peerId"],
collect: async () => { collect: async () => {
gauge.reset(); unread.reset();
for await (const d of tg.iterDialogs()) { for (const d of await dialogs.get()) {
if (!peersConfigBoolFilter(config, d.peer.id)) { unread.set({
continue;
}
gauge.set({
peerId: d.peer.id, peerId: d.peer.id,
}, d.unreadCount); }, d.unreadCount);
} }
}, },
}); });
return gauge;
registry.registerMetric(info);
registry.registerMetric(unread);
} }
function newWordsCounter(dp: Dispatcher) { function newWordsCounter(dp: Dispatcher) {
@ -142,9 +174,8 @@ function newWordsCounter(dp: Dispatcher) {
} }
export { export {
collectDialogMetrics,
collectNewMessageMetrics, collectNewMessageMetrics,
KeywordsCounter, KeywordsCounter,
newStaticPeerInfoGauge,
newUnreadCountGauge,
newWordsCounter, newWordsCounter,
}; };