diff --git a/docker-compose.yaml b/docker-compose.yaml index 7e5edc8..26f90e7 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -1,4 +1,3 @@ -version: "3.9" services: bot: build: @@ -13,5 +12,6 @@ services: - "--keywords-file" - "/app/keywords.yml" - "--watch-file" + - "--reactions-collector-load-history" ports: - - 127.0.0.1:9669:9669 + - 0.0.0.0:9669:9669 diff --git a/package.json b/package.json index 216a222..c457786 100644 --- a/package.json +++ b/package.json @@ -1,7 +1,7 @@ { "name": "mtproto_exporter", "type": "module", - "version": "1.3.1", + "version": "1.4.0", "packageManager": "pnpm@10.6.5", "license": "MIT", "scripts": { diff --git a/src/config.ts b/src/config.ts index 4e42c31..05502e2 100644 --- a/src/config.ts +++ b/src/config.ts @@ -13,6 +13,8 @@ export interface Configuration { includePeers?: number[]; excludePeers?: number[]; keywords?: RawKeywordLike[]; + reactionsCollectorLoadHistory: boolean; + reactionsCollectorLoadHistorySize: number; } const optionDefinitions: OptionDefinition[] = [ @@ -23,6 +25,8 @@ const optionDefinitions: OptionDefinition[] = [ { name: "watch-file", alias: "w", type: Boolean, defaultValue: false }, { name: "include-peers", alias: "i", type: String, multiple: true }, { name: "exclude-peers", alias: "x", type: String, multiple: true }, + { name: "reactions-collector-load-history", type: Boolean, defaultValue: false }, + { name: "reactions-collector-load-history-size", type: Number, defaultValue: 1000 }, ]; const cli = cmdline(optionDefinitions); @@ -34,6 +38,8 @@ const config: Configuration = { keywordsFile: cli["keywords-file"], watchFile: cli["watch-file"], keywords: cli["keywords-file"] ? await readKeywords(cli["keywords-file"]) : undefined, + reactionsCollectorLoadHistory: cli["reactions-collector-load-history"], + reactionsCollectorLoadHistorySize: cli["reactions-collector-load-history-size"], }; if (cli["include-peers"] && cli["exclude-peers"]) { diff --git a/src/main.ts b/src/main.ts index f748535..1d487e2 100644 --- a/src/main.ts +++ b/src/main.ts @@ -1,8 +1,8 @@ import fs from "node:fs"; import { Dispatcher } from "@mtcute/dispatcher"; import { TelegramClient } from "@mtcute/node"; -import { collectDefaultMetrics, Registry } from "prom-client"; +import { collectDefaultMetrics, Registry } from "prom-client"; import { config, readKeywords } from "./config.js"; import * as env from "./env.js"; import { rawToPatterns } from "./metrics/keywords.js"; @@ -34,6 +34,7 @@ console.log("Logged in as", user.username); metrics.collectDialogMetrics(tg, registry); metrics.collectNewMessageMetrics(dp, registry); +metrics.collectReactionsMetrics(tg, dp, registry); if (config.keywords) { const counter = new metrics.KeywordsCounter(dp, rawToPatterns(config.keywords)); diff --git a/src/metrics/dialogs.ts b/src/metrics/dialogs.ts index 0c9672e..241b0cb 100644 --- a/src/metrics/dialogs.ts +++ b/src/metrics/dialogs.ts @@ -1,10 +1,10 @@ import type { Dialog, TelegramClient } from "@mtcute/node"; -import { Registry, Summary } from "prom-client"; +import type { Registry } from "prom-client"; import process from "node:process"; import timers from "node:timers/promises"; +import { Gauge, Histogram, Summary } from "prom-client"; -import { Gauge, Histogram } from "prom-client"; import { config } from "../config.js"; import { peersConfigBoolFilter } from "../filters.js"; diff --git a/src/metrics/metrics.ts b/src/metrics/metrics.ts index 86db334..3e43575 100644 --- a/src/metrics/metrics.ts +++ b/src/metrics/metrics.ts @@ -9,6 +9,7 @@ import { peersConfigFilter } from "../filters.js"; import { collectDialogMetrics } from "./dialogs.js"; import { KeywordsCounter } from "./keywords.js"; import { collectNewMessageMetrics } from "./message.js"; +import { collectReactionsMetrics } from "./reactions.js"; function newWordsCounter(dp: Dispatcher) { const counter = new Counter({ @@ -32,6 +33,7 @@ function newWordsCounter(dp: Dispatcher) { export { collectDialogMetrics, collectNewMessageMetrics, + collectReactionsMetrics, KeywordsCounter, newWordsCounter, }; diff --git a/src/metrics/reactions.ts b/src/metrics/reactions.ts new file mode 100644 index 0000000..56e2076 --- /dev/null +++ b/src/metrics/reactions.ts @@ -0,0 +1,213 @@ +import type { Dispatcher } from "@mtcute/dispatcher"; +import type { TelegramClient, tl } from "@mtcute/node"; + +import type { Registry } from "prom-client"; +import { setTimeout } from "node:timers/promises"; +import { PropagationAction } from "@mtcute/dispatcher"; +import { Counter, Gauge } from "prom-client"; +import { config } from "../config.js"; +import { peersConfigBoolFilter, peersConfigFilter } from "../filters.js"; + +type ReactionsMap = Map; +type MessageReactionsMap = Map; +type PeerMessagesMap = Map; + +function getRawPeerId(peer: tl.TypePeer) { + switch (peer._) { + case "peerUser": { + return peer.userId; + } + case "peerChat": { + return peer.chatId; + } + case "peerChannel": { + return peer.channelId; + } + } +} + +function getRawReactionEmoji(reaction: tl.TypeReaction) { + let emojiId: string; + let emojiName: string; + switch (reaction._) { + case "reactionEmoji": { + emojiId = reaction.emoticon; + emojiName = reaction.emoticon; + break; + } + case "reactionCustomEmoji": { + emojiId = ``; + emojiName = ""; + break; + } + case "reactionPaid": { + emojiId = ""; + emojiName = "⭐ (Paid)"; + break; + } + case "reactionEmpty": { + emojiId = ""; + emojiName = ""; + break; + } + } + return { id: emojiId, name: emojiName }; +} + +function getEmojiNameFromId(id: string) { + if (id === "") { + return "⭐ (Paid)"; + } + if (id.startsWith(""; + } + return id; +} + +export async function collectReactionsMetrics(tg: TelegramClient, dp: Dispatcher, registry: Registry) { + const peers: PeerMessagesMap = new Map(); + + const set = new Counter({ + name: "messenger_dialog_reactions_set_count", + help: "Reactions set count since exporter startup", + labelNames: ["peerId", "emoji"], + }); + + const removed = new Counter({ + name: "messenger_dialog_reactions_removed_count", + help: "Reactions removed count since exporter startup", + labelNames: ["peerId", "emoji"], + }); + + const peersSize = new Gauge({ + name: "mtproto_exporter_reactions_collector_peers_cache_size", + help: "Size of peers cache map size in reactions collector", + collect: () => { + peersSize.set(peers.size); + }, + }); + const messagesSize = new Gauge({ + name: "mtproto_exporter_reactions_collector_messages_cache_size", + help: "Size of messages cache map size in reactions collector", + collect: () => { + messagesSize.reset(); + for (const m of peers.values()) { + messagesSize.inc(m.size); + } + }, + }); + const reactionsSize = new Gauge({ + name: "mtproto_exporter_reactions_collector_reactions_cache_size", + help: "Size of reactions cache map size in reactions collector", + collect: () => { + reactionsSize.reset(); + for (const m of peers.values()) { + for (const r of m.values()) { + reactionsSize.inc(r.size); + } + } + }, + }); + + registry.registerMetric(set); + registry.registerMetric(removed); + registry.registerMetric(peersSize); + registry.registerMetric(messagesSize); + registry.registerMetric(reactionsSize); + + if (config.reactionsCollectorLoadHistory) { + console.log("fetching dialogs history into reactions collector cache...."); + const historyIterOptions = { + limit: config.reactionsCollectorLoadHistorySize, + }; + for await (const dialog of tg.iterDialogs()) { + console.log("fetching dialog with peer id", dialog.peer.id); + if (!peersConfigBoolFilter(config, dialog.peer.id)) { + continue; + } + for await (const message of tg.iterHistory(dialog.peer.id, historyIterOptions)) { + await handleReactionsUpdate(message.id, dialog.peer.id, message.reactions?.raw.results ?? []); + } + await setTimeout(5000); + } + } + + // we need to count only new messages + // because we don't know true number of reactions before updates + dp.onNewMessage((message) => { + const messages: MessageReactionsMap = peers.get(message.chat.id) ?? new Map(); + const reactions: ReactionsMap = messages.get(message.id) ?? new Map(); + + reactions.clear(); + + messages.set(message.id, reactions); + peers.set(message.chat.id, messages); + + return PropagationAction.Continue; + }); + + tg.onRawUpdate.add(async (info) => { + if ("updates" in info) { + const updates = info.updates as tl.TypeUpdate[]; + const reactionsUpdates = updates.filter(u => u._ === "updateMessageReactions"); + for (const update of reactionsUpdates) { + await handleReactionsUpdate(update.msgId, getRawPeerId(update.peer), update.reactions.results); + } + } else if (info.update && info.update._ === "updateMessageReactions") { + await handleReactionsUpdate(info.update.msgId, getRawPeerId(info.update.peer), info.update.reactions.results); + } + }); + + dp.onEditMessage(peersConfigFilter(config), async (message) => { + if (!message.reactions || !message.reactions.reactions) { + return; + } + await handleReactionsUpdate(message.id, message.chat.id, message.reactions.raw.results); + return PropagationAction.Continue; + }); + + async function handleReactionsUpdate(messageId: number, peerId: number, reactions: tl.RawReactionCount[]) { + const peer: MessageReactionsMap = peers.get(peerId) ?? new Map(); + const oldReactions = peer.get(messageId); + + const newReactions = new Map(); + for (const r of reactions) { + const emoji = getRawReactionEmoji(r.reaction); + newReactions.set(emoji.id, r.count); + } + + if (!oldReactions) { + peer.set(messageId, newReactions); + peers.set(peerId, peer); + return; + } + + const allReactions = new Set([ + ...newReactions.keys(), + ...oldReactions.keys(), + ]); + + for (const r of allReactions) { + const countBefore = oldReactions.get(r) ?? 0; + const countAfter = newReactions.get(r) ?? 0; + const diff = countAfter - countBefore; + + if (diff > 0) { + set.inc({ + peerId, + emoji: getEmojiNameFromId(r), + }); + } else if (diff < 0) { + removed.inc({ + peerId, + emoji: getEmojiNameFromId(r), + }); + } + + oldReactions.set(r, countAfter); + } + + peer.set(messageId, oldReactions); + peers.set(peerId, peer); + } +}