Compare commits

...

2 commits

Author SHA1 Message Date
c48222d4e5 add reactions collector 2025-04-11 13:39:41 +03:00
92be3ebb5e add summary too, just in case idk 2025-04-09 22:14:54 +03:00
7 changed files with 234 additions and 6 deletions

View file

@ -1,4 +1,3 @@
version: "3.9"
services: services:
bot: bot:
build: build:
@ -13,5 +12,6 @@ services:
- "--keywords-file" - "--keywords-file"
- "/app/keywords.yml" - "/app/keywords.yml"
- "--watch-file" - "--watch-file"
- "--reactions-collector-load-history"
ports: ports:
- 127.0.0.1:9669:9669 - 0.0.0.0:9669:9669

View file

@ -1,7 +1,7 @@
{ {
"name": "mtproto_exporter", "name": "mtproto_exporter",
"type": "module", "type": "module",
"version": "1.3.0", "version": "1.4.0",
"packageManager": "pnpm@10.6.5", "packageManager": "pnpm@10.6.5",
"license": "MIT", "license": "MIT",
"scripts": { "scripts": {

View file

@ -13,6 +13,8 @@ export interface Configuration {
includePeers?: number[]; includePeers?: number[];
excludePeers?: number[]; excludePeers?: number[];
keywords?: RawKeywordLike[]; keywords?: RawKeywordLike[];
reactionsCollectorLoadHistory: boolean;
reactionsCollectorLoadHistorySize: number;
} }
const optionDefinitions: OptionDefinition[] = [ const optionDefinitions: OptionDefinition[] = [
@ -23,6 +25,8 @@ const optionDefinitions: OptionDefinition[] = [
{ name: "watch-file", alias: "w", type: Boolean, defaultValue: false }, { name: "watch-file", alias: "w", type: Boolean, defaultValue: false },
{ name: "include-peers", alias: "i", type: String, multiple: true }, { name: "include-peers", alias: "i", type: String, multiple: true },
{ name: "exclude-peers", alias: "x", type: String, multiple: true }, { name: "exclude-peers", alias: "x", type: String, multiple: true },
{ name: "reactions-collector-load-history", type: Boolean, defaultValue: false },
{ name: "reactions-collector-load-history-size", type: Number, defaultValue: 1000 },
]; ];
const cli = cmdline(optionDefinitions); const cli = cmdline(optionDefinitions);
@ -34,6 +38,8 @@ const config: Configuration = {
keywordsFile: cli["keywords-file"], keywordsFile: cli["keywords-file"],
watchFile: cli["watch-file"], watchFile: cli["watch-file"],
keywords: cli["keywords-file"] ? await readKeywords(cli["keywords-file"]) : undefined, keywords: cli["keywords-file"] ? await readKeywords(cli["keywords-file"]) : undefined,
reactionsCollectorLoadHistory: cli["reactions-collector-load-history"],
reactionsCollectorLoadHistorySize: cli["reactions-collector-load-history-size"],
}; };
if (cli["include-peers"] && cli["exclude-peers"]) { if (cli["include-peers"] && cli["exclude-peers"]) {

View file

@ -1,8 +1,8 @@
import fs from "node:fs"; import fs from "node:fs";
import { Dispatcher } from "@mtcute/dispatcher"; import { Dispatcher } from "@mtcute/dispatcher";
import { TelegramClient } from "@mtcute/node"; import { TelegramClient } from "@mtcute/node";
import { collectDefaultMetrics, Registry } from "prom-client";
import { collectDefaultMetrics, Registry } from "prom-client";
import { config, readKeywords } from "./config.js"; import { config, readKeywords } from "./config.js";
import * as env from "./env.js"; import * as env from "./env.js";
import { rawToPatterns } from "./metrics/keywords.js"; import { rawToPatterns } from "./metrics/keywords.js";
@ -34,6 +34,7 @@ console.log("Logged in as", user.username);
metrics.collectDialogMetrics(tg, registry); metrics.collectDialogMetrics(tg, registry);
metrics.collectNewMessageMetrics(dp, registry); metrics.collectNewMessageMetrics(dp, registry);
metrics.collectReactionsMetrics(tg, dp, registry);
if (config.keywords) { if (config.keywords) {
const counter = new metrics.KeywordsCounter(dp, rawToPatterns(config.keywords)); const counter = new metrics.KeywordsCounter(dp, rawToPatterns(config.keywords));

View file

@ -3,8 +3,8 @@ import type { Dialog, TelegramClient } from "@mtcute/node";
import type { Registry } from "prom-client"; import type { Registry } from "prom-client";
import process from "node:process"; import process from "node:process";
import timers from "node:timers/promises"; import timers from "node:timers/promises";
import { Gauge, Histogram, Summary } from "prom-client";
import { Gauge, Histogram } from "prom-client";
import { config } from "../config.js"; import { config } from "../config.js";
import { peersConfigBoolFilter } from "../filters.js"; import { peersConfigBoolFilter } from "../filters.js";
@ -53,6 +53,7 @@ class DialogsHolder {
private ttl: bigint; private ttl: bigint;
private dialogsIterDurationHistogram: Histogram; private dialogsIterDurationHistogram: Histogram;
private dialogsIterDurationSummary: Summary;
constructor(private tg: TelegramClient, ttl: number, private timeout: number, private pollInterval = 10) { constructor(private tg: TelegramClient, ttl: number, private timeout: number, private pollInterval = 10) {
this.ttl = BigInt(ttl) * 1000000n; this.ttl = BigInt(ttl) * 1000000n;
@ -60,10 +61,15 @@ class DialogsHolder {
name: "telegram_api_dialogs_iter_duration", name: "telegram_api_dialogs_iter_duration",
help: "Duration of iteration over telegram dialogs", help: "Duration of iteration over telegram dialogs",
}); });
this.dialogsIterDurationSummary = new Summary({
name: "telegram_api_dialogs_iter_duration_summary",
help: "Duration of iteration over telegram dialogs",
});
} }
public registerMetrics(registry: Registry) { public registerMetrics(registry: Registry) {
registry.registerMetric(this.dialogsIterDurationHistogram); registry.registerMetric(this.dialogsIterDurationHistogram);
registry.registerMetric(this.dialogsIterDurationSummary);
} }
public async get() { public async get() {
@ -85,7 +91,7 @@ class DialogsHolder {
} }
this.dialogs.push(d); this.dialogs.push(d);
} }
end(); this.dialogsIterDurationSummary.observe(end());
this.lastUpdate = process.hrtime.bigint(); this.lastUpdate = process.hrtime.bigint();
this.isUpdating = false; this.isUpdating = false;
} }

View file

@ -9,6 +9,7 @@ import { peersConfigFilter } from "../filters.js";
import { collectDialogMetrics } from "./dialogs.js"; import { collectDialogMetrics } from "./dialogs.js";
import { KeywordsCounter } from "./keywords.js"; import { KeywordsCounter } from "./keywords.js";
import { collectNewMessageMetrics } from "./message.js"; import { collectNewMessageMetrics } from "./message.js";
import { collectReactionsMetrics } from "./reactions.js";
function newWordsCounter(dp: Dispatcher) { function newWordsCounter(dp: Dispatcher) {
const counter = new Counter({ const counter = new Counter({
@ -32,6 +33,7 @@ function newWordsCounter(dp: Dispatcher) {
export { export {
collectDialogMetrics, collectDialogMetrics,
collectNewMessageMetrics, collectNewMessageMetrics,
collectReactionsMetrics,
KeywordsCounter, KeywordsCounter,
newWordsCounter, newWordsCounter,
}; };

213
src/metrics/reactions.ts Normal file
View file

@ -0,0 +1,213 @@
import type { Dispatcher } from "@mtcute/dispatcher";
import type { TelegramClient, tl } from "@mtcute/node";
import type { Registry } from "prom-client";
import { setTimeout } from "node:timers/promises";
import { PropagationAction } from "@mtcute/dispatcher";
import { Counter, Gauge } from "prom-client";
import { config } from "../config.js";
import { peersConfigBoolFilter, peersConfigFilter } from "../filters.js";
type ReactionsMap = Map<string, number>;
type MessageReactionsMap = Map<number, ReactionsMap>;
type PeerMessagesMap = Map<number, MessageReactionsMap>;
function getRawPeerId(peer: tl.TypePeer) {
switch (peer._) {
case "peerUser": {
return peer.userId;
}
case "peerChat": {
return peer.chatId;
}
case "peerChannel": {
return peer.channelId;
}
}
}
function getRawReactionEmoji(reaction: tl.TypeReaction) {
let emojiId: string;
let emojiName: string;
switch (reaction._) {
case "reactionEmoji": {
emojiId = reaction.emoticon;
emojiName = reaction.emoticon;
break;
}
case "reactionCustomEmoji": {
emojiId = `<custom:${reaction.documentId.toString()}>`;
emojiName = "<custom>";
break;
}
case "reactionPaid": {
emojiId = "<star_paid>";
emojiName = "⭐ (Paid)";
break;
}
case "reactionEmpty": {
emojiId = "<empty>";
emojiName = "<empty>";
break;
}
}
return { id: emojiId, name: emojiName };
}
function getEmojiNameFromId(id: string) {
if (id === "<star_paid>") {
return "⭐ (Paid)";
}
if (id.startsWith("<custom:")) {
return "<custom>";
}
return id;
}
export async function collectReactionsMetrics(tg: TelegramClient, dp: Dispatcher, registry: Registry) {
const peers: PeerMessagesMap = new Map();
const set = new Counter({
name: "messenger_dialog_reactions_set_count",
help: "Reactions set count since exporter startup",
labelNames: ["peerId", "emoji"],
});
const removed = new Counter({
name: "messenger_dialog_reactions_removed_count",
help: "Reactions removed count since exporter startup",
labelNames: ["peerId", "emoji"],
});
const peersSize = new Gauge({
name: "mtproto_exporter_reactions_collector_peers_cache_size",
help: "Size of peers cache map size in reactions collector",
collect: () => {
peersSize.set(peers.size);
},
});
const messagesSize = new Gauge({
name: "mtproto_exporter_reactions_collector_messages_cache_size",
help: "Size of messages cache map size in reactions collector",
collect: () => {
messagesSize.reset();
for (const m of peers.values()) {
messagesSize.inc(m.size);
}
},
});
const reactionsSize = new Gauge({
name: "mtproto_exporter_reactions_collector_reactions_cache_size",
help: "Size of reactions cache map size in reactions collector",
collect: () => {
reactionsSize.reset();
for (const m of peers.values()) {
for (const r of m.values()) {
reactionsSize.inc(r.size);
}
}
},
});
registry.registerMetric(set);
registry.registerMetric(removed);
registry.registerMetric(peersSize);
registry.registerMetric(messagesSize);
registry.registerMetric(reactionsSize);
if (config.reactionsCollectorLoadHistory) {
console.log("fetching dialogs history into reactions collector cache....");
const historyIterOptions = {
limit: config.reactionsCollectorLoadHistorySize,
};
for await (const dialog of tg.iterDialogs()) {
console.log("fetching dialog with peer id", dialog.peer.id);
if (!peersConfigBoolFilter(config, dialog.peer.id)) {
continue;
}
for await (const message of tg.iterHistory(dialog.peer.id, historyIterOptions)) {
await handleReactionsUpdate(message.id, dialog.peer.id, message.reactions?.raw.results ?? []);
}
await setTimeout(5000);
}
}
// we need to count only new messages
// because we don't know true number of reactions before updates
dp.onNewMessage((message) => {
const messages: MessageReactionsMap = peers.get(message.chat.id) ?? new Map();
const reactions: ReactionsMap = messages.get(message.id) ?? new Map();
reactions.clear();
messages.set(message.id, reactions);
peers.set(message.chat.id, messages);
return PropagationAction.Continue;
});
tg.onRawUpdate.add(async (info) => {
if ("updates" in info) {
const updates = info.updates as tl.TypeUpdate[];
const reactionsUpdates = updates.filter(u => u._ === "updateMessageReactions");
for (const update of reactionsUpdates) {
await handleReactionsUpdate(update.msgId, getRawPeerId(update.peer), update.reactions.results);
}
} else if (info.update && info.update._ === "updateMessageReactions") {
await handleReactionsUpdate(info.update.msgId, getRawPeerId(info.update.peer), info.update.reactions.results);
}
});
dp.onEditMessage(peersConfigFilter(config), async (message) => {
if (!message.reactions || !message.reactions.reactions) {
return;
}
await handleReactionsUpdate(message.id, message.chat.id, message.reactions.raw.results);
return PropagationAction.Continue;
});
async function handleReactionsUpdate(messageId: number, peerId: number, reactions: tl.RawReactionCount[]) {
const peer: MessageReactionsMap = peers.get(peerId) ?? new Map();
const oldReactions = peer.get(messageId);
const newReactions = new Map<string, number>();
for (const r of reactions) {
const emoji = getRawReactionEmoji(r.reaction);
newReactions.set(emoji.id, r.count);
}
if (!oldReactions) {
peer.set(messageId, newReactions);
peers.set(peerId, peer);
return;
}
const allReactions = new Set<string>([
...newReactions.keys(),
...oldReactions.keys(),
]);
for (const r of allReactions) {
const countBefore = oldReactions.get(r) ?? 0;
const countAfter = newReactions.get(r) ?? 0;
const diff = countAfter - countBefore;
if (diff > 0) {
set.inc({
peerId,
emoji: getEmojiNameFromId(r),
});
} else if (diff < 0) {
removed.inc({
peerId,
emoji: getEmojiNameFromId(r),
});
}
oldReactions.set(r, countAfter);
}
peer.set(messageId, oldReactions);
peers.set(peerId, peer);
}
}