add reactions collector
This commit is contained in:
parent
92be3ebb5e
commit
c48222d4e5
7 changed files with 228 additions and 6 deletions
|
|
@ -1,4 +1,3 @@
|
||||||
version: "3.9"
|
|
||||||
services:
|
services:
|
||||||
bot:
|
bot:
|
||||||
build:
|
build:
|
||||||
|
|
@ -13,5 +12,6 @@ services:
|
||||||
- "--keywords-file"
|
- "--keywords-file"
|
||||||
- "/app/keywords.yml"
|
- "/app/keywords.yml"
|
||||||
- "--watch-file"
|
- "--watch-file"
|
||||||
|
- "--reactions-collector-load-history"
|
||||||
ports:
|
ports:
|
||||||
- 127.0.0.1:9669:9669
|
- 0.0.0.0:9669:9669
|
||||||
|
|
|
||||||
|
|
@ -1,7 +1,7 @@
|
||||||
{
|
{
|
||||||
"name": "mtproto_exporter",
|
"name": "mtproto_exporter",
|
||||||
"type": "module",
|
"type": "module",
|
||||||
"version": "1.3.1",
|
"version": "1.4.0",
|
||||||
"packageManager": "pnpm@10.6.5",
|
"packageManager": "pnpm@10.6.5",
|
||||||
"license": "MIT",
|
"license": "MIT",
|
||||||
"scripts": {
|
"scripts": {
|
||||||
|
|
|
||||||
|
|
@ -13,6 +13,8 @@ export interface Configuration {
|
||||||
includePeers?: number[];
|
includePeers?: number[];
|
||||||
excludePeers?: number[];
|
excludePeers?: number[];
|
||||||
keywords?: RawKeywordLike[];
|
keywords?: RawKeywordLike[];
|
||||||
|
reactionsCollectorLoadHistory: boolean;
|
||||||
|
reactionsCollectorLoadHistorySize: number;
|
||||||
}
|
}
|
||||||
|
|
||||||
const optionDefinitions: OptionDefinition[] = [
|
const optionDefinitions: OptionDefinition[] = [
|
||||||
|
|
@ -23,6 +25,8 @@ const optionDefinitions: OptionDefinition[] = [
|
||||||
{ name: "watch-file", alias: "w", type: Boolean, defaultValue: false },
|
{ name: "watch-file", alias: "w", type: Boolean, defaultValue: false },
|
||||||
{ name: "include-peers", alias: "i", type: String, multiple: true },
|
{ name: "include-peers", alias: "i", type: String, multiple: true },
|
||||||
{ name: "exclude-peers", alias: "x", type: String, multiple: true },
|
{ name: "exclude-peers", alias: "x", type: String, multiple: true },
|
||||||
|
{ name: "reactions-collector-load-history", type: Boolean, defaultValue: false },
|
||||||
|
{ name: "reactions-collector-load-history-size", type: Number, defaultValue: 1000 },
|
||||||
];
|
];
|
||||||
|
|
||||||
const cli = cmdline(optionDefinitions);
|
const cli = cmdline(optionDefinitions);
|
||||||
|
|
@ -34,6 +38,8 @@ const config: Configuration = {
|
||||||
keywordsFile: cli["keywords-file"],
|
keywordsFile: cli["keywords-file"],
|
||||||
watchFile: cli["watch-file"],
|
watchFile: cli["watch-file"],
|
||||||
keywords: cli["keywords-file"] ? await readKeywords(cli["keywords-file"]) : undefined,
|
keywords: cli["keywords-file"] ? await readKeywords(cli["keywords-file"]) : undefined,
|
||||||
|
reactionsCollectorLoadHistory: cli["reactions-collector-load-history"],
|
||||||
|
reactionsCollectorLoadHistorySize: cli["reactions-collector-load-history-size"],
|
||||||
};
|
};
|
||||||
|
|
||||||
if (cli["include-peers"] && cli["exclude-peers"]) {
|
if (cli["include-peers"] && cli["exclude-peers"]) {
|
||||||
|
|
|
||||||
|
|
@ -1,8 +1,8 @@
|
||||||
import fs from "node:fs";
|
import fs from "node:fs";
|
||||||
import { Dispatcher } from "@mtcute/dispatcher";
|
import { Dispatcher } from "@mtcute/dispatcher";
|
||||||
import { TelegramClient } from "@mtcute/node";
|
import { TelegramClient } from "@mtcute/node";
|
||||||
import { collectDefaultMetrics, Registry } from "prom-client";
|
|
||||||
|
|
||||||
|
import { collectDefaultMetrics, Registry } from "prom-client";
|
||||||
import { config, readKeywords } from "./config.js";
|
import { config, readKeywords } from "./config.js";
|
||||||
import * as env from "./env.js";
|
import * as env from "./env.js";
|
||||||
import { rawToPatterns } from "./metrics/keywords.js";
|
import { rawToPatterns } from "./metrics/keywords.js";
|
||||||
|
|
@ -34,6 +34,7 @@ console.log("Logged in as", user.username);
|
||||||
|
|
||||||
metrics.collectDialogMetrics(tg, registry);
|
metrics.collectDialogMetrics(tg, registry);
|
||||||
metrics.collectNewMessageMetrics(dp, registry);
|
metrics.collectNewMessageMetrics(dp, registry);
|
||||||
|
metrics.collectReactionsMetrics(tg, dp, registry);
|
||||||
|
|
||||||
if (config.keywords) {
|
if (config.keywords) {
|
||||||
const counter = new metrics.KeywordsCounter(dp, rawToPatterns(config.keywords));
|
const counter = new metrics.KeywordsCounter(dp, rawToPatterns(config.keywords));
|
||||||
|
|
|
||||||
|
|
@ -1,10 +1,10 @@
|
||||||
import type { Dialog, TelegramClient } from "@mtcute/node";
|
import type { Dialog, TelegramClient } from "@mtcute/node";
|
||||||
|
|
||||||
import { Registry, Summary } from "prom-client";
|
import type { Registry } from "prom-client";
|
||||||
import process from "node:process";
|
import process from "node:process";
|
||||||
import timers from "node:timers/promises";
|
import timers from "node:timers/promises";
|
||||||
|
import { Gauge, Histogram, Summary } from "prom-client";
|
||||||
|
|
||||||
import { Gauge, Histogram } from "prom-client";
|
|
||||||
import { config } from "../config.js";
|
import { config } from "../config.js";
|
||||||
import { peersConfigBoolFilter } from "../filters.js";
|
import { peersConfigBoolFilter } from "../filters.js";
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -9,6 +9,7 @@ import { peersConfigFilter } from "../filters.js";
|
||||||
import { collectDialogMetrics } from "./dialogs.js";
|
import { collectDialogMetrics } from "./dialogs.js";
|
||||||
import { KeywordsCounter } from "./keywords.js";
|
import { KeywordsCounter } from "./keywords.js";
|
||||||
import { collectNewMessageMetrics } from "./message.js";
|
import { collectNewMessageMetrics } from "./message.js";
|
||||||
|
import { collectReactionsMetrics } from "./reactions.js";
|
||||||
|
|
||||||
function newWordsCounter(dp: Dispatcher) {
|
function newWordsCounter(dp: Dispatcher) {
|
||||||
const counter = new Counter({
|
const counter = new Counter({
|
||||||
|
|
@ -32,6 +33,7 @@ function newWordsCounter(dp: Dispatcher) {
|
||||||
export {
|
export {
|
||||||
collectDialogMetrics,
|
collectDialogMetrics,
|
||||||
collectNewMessageMetrics,
|
collectNewMessageMetrics,
|
||||||
|
collectReactionsMetrics,
|
||||||
KeywordsCounter,
|
KeywordsCounter,
|
||||||
newWordsCounter,
|
newWordsCounter,
|
||||||
};
|
};
|
||||||
|
|
|
||||||
213
src/metrics/reactions.ts
Normal file
213
src/metrics/reactions.ts
Normal file
|
|
@ -0,0 +1,213 @@
|
||||||
|
import type { Dispatcher } from "@mtcute/dispatcher";
|
||||||
|
import type { TelegramClient, tl } from "@mtcute/node";
|
||||||
|
|
||||||
|
import type { Registry } from "prom-client";
|
||||||
|
import { setTimeout } from "node:timers/promises";
|
||||||
|
import { PropagationAction } from "@mtcute/dispatcher";
|
||||||
|
import { Counter, Gauge } from "prom-client";
|
||||||
|
import { config } from "../config.js";
|
||||||
|
import { peersConfigBoolFilter, peersConfigFilter } from "../filters.js";
|
||||||
|
|
||||||
|
type ReactionsMap = Map<string, number>;
|
||||||
|
type MessageReactionsMap = Map<number, ReactionsMap>;
|
||||||
|
type PeerMessagesMap = Map<number, MessageReactionsMap>;
|
||||||
|
|
||||||
|
function getRawPeerId(peer: tl.TypePeer) {
|
||||||
|
switch (peer._) {
|
||||||
|
case "peerUser": {
|
||||||
|
return peer.userId;
|
||||||
|
}
|
||||||
|
case "peerChat": {
|
||||||
|
return peer.chatId;
|
||||||
|
}
|
||||||
|
case "peerChannel": {
|
||||||
|
return peer.channelId;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
function getRawReactionEmoji(reaction: tl.TypeReaction) {
|
||||||
|
let emojiId: string;
|
||||||
|
let emojiName: string;
|
||||||
|
switch (reaction._) {
|
||||||
|
case "reactionEmoji": {
|
||||||
|
emojiId = reaction.emoticon;
|
||||||
|
emojiName = reaction.emoticon;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
case "reactionCustomEmoji": {
|
||||||
|
emojiId = `<custom:${reaction.documentId.toString()}>`;
|
||||||
|
emojiName = "<custom>";
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
case "reactionPaid": {
|
||||||
|
emojiId = "<star_paid>";
|
||||||
|
emojiName = "⭐ (Paid)";
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
case "reactionEmpty": {
|
||||||
|
emojiId = "<empty>";
|
||||||
|
emojiName = "<empty>";
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return { id: emojiId, name: emojiName };
|
||||||
|
}
|
||||||
|
|
||||||
|
function getEmojiNameFromId(id: string) {
|
||||||
|
if (id === "<star_paid>") {
|
||||||
|
return "⭐ (Paid)";
|
||||||
|
}
|
||||||
|
if (id.startsWith("<custom:")) {
|
||||||
|
return "<custom>";
|
||||||
|
}
|
||||||
|
return id;
|
||||||
|
}
|
||||||
|
|
||||||
|
export async function collectReactionsMetrics(tg: TelegramClient, dp: Dispatcher, registry: Registry) {
|
||||||
|
const peers: PeerMessagesMap = new Map();
|
||||||
|
|
||||||
|
const set = new Counter({
|
||||||
|
name: "messenger_dialog_reactions_set_count",
|
||||||
|
help: "Reactions set count since exporter startup",
|
||||||
|
labelNames: ["peerId", "emoji"],
|
||||||
|
});
|
||||||
|
|
||||||
|
const removed = new Counter({
|
||||||
|
name: "messenger_dialog_reactions_removed_count",
|
||||||
|
help: "Reactions removed count since exporter startup",
|
||||||
|
labelNames: ["peerId", "emoji"],
|
||||||
|
});
|
||||||
|
|
||||||
|
const peersSize = new Gauge({
|
||||||
|
name: "mtproto_exporter_reactions_collector_peers_cache_size",
|
||||||
|
help: "Size of peers cache map size in reactions collector",
|
||||||
|
collect: () => {
|
||||||
|
peersSize.set(peers.size);
|
||||||
|
},
|
||||||
|
});
|
||||||
|
const messagesSize = new Gauge({
|
||||||
|
name: "mtproto_exporter_reactions_collector_messages_cache_size",
|
||||||
|
help: "Size of messages cache map size in reactions collector",
|
||||||
|
collect: () => {
|
||||||
|
messagesSize.reset();
|
||||||
|
for (const m of peers.values()) {
|
||||||
|
messagesSize.inc(m.size);
|
||||||
|
}
|
||||||
|
},
|
||||||
|
});
|
||||||
|
const reactionsSize = new Gauge({
|
||||||
|
name: "mtproto_exporter_reactions_collector_reactions_cache_size",
|
||||||
|
help: "Size of reactions cache map size in reactions collector",
|
||||||
|
collect: () => {
|
||||||
|
reactionsSize.reset();
|
||||||
|
for (const m of peers.values()) {
|
||||||
|
for (const r of m.values()) {
|
||||||
|
reactionsSize.inc(r.size);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
},
|
||||||
|
});
|
||||||
|
|
||||||
|
registry.registerMetric(set);
|
||||||
|
registry.registerMetric(removed);
|
||||||
|
registry.registerMetric(peersSize);
|
||||||
|
registry.registerMetric(messagesSize);
|
||||||
|
registry.registerMetric(reactionsSize);
|
||||||
|
|
||||||
|
if (config.reactionsCollectorLoadHistory) {
|
||||||
|
console.log("fetching dialogs history into reactions collector cache....");
|
||||||
|
const historyIterOptions = {
|
||||||
|
limit: config.reactionsCollectorLoadHistorySize,
|
||||||
|
};
|
||||||
|
for await (const dialog of tg.iterDialogs()) {
|
||||||
|
console.log("fetching dialog with peer id", dialog.peer.id);
|
||||||
|
if (!peersConfigBoolFilter(config, dialog.peer.id)) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
for await (const message of tg.iterHistory(dialog.peer.id, historyIterOptions)) {
|
||||||
|
await handleReactionsUpdate(message.id, dialog.peer.id, message.reactions?.raw.results ?? []);
|
||||||
|
}
|
||||||
|
await setTimeout(5000);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// we need to count only new messages
|
||||||
|
// because we don't know true number of reactions before updates
|
||||||
|
dp.onNewMessage((message) => {
|
||||||
|
const messages: MessageReactionsMap = peers.get(message.chat.id) ?? new Map();
|
||||||
|
const reactions: ReactionsMap = messages.get(message.id) ?? new Map();
|
||||||
|
|
||||||
|
reactions.clear();
|
||||||
|
|
||||||
|
messages.set(message.id, reactions);
|
||||||
|
peers.set(message.chat.id, messages);
|
||||||
|
|
||||||
|
return PropagationAction.Continue;
|
||||||
|
});
|
||||||
|
|
||||||
|
tg.onRawUpdate.add(async (info) => {
|
||||||
|
if ("updates" in info) {
|
||||||
|
const updates = info.updates as tl.TypeUpdate[];
|
||||||
|
const reactionsUpdates = updates.filter(u => u._ === "updateMessageReactions");
|
||||||
|
for (const update of reactionsUpdates) {
|
||||||
|
await handleReactionsUpdate(update.msgId, getRawPeerId(update.peer), update.reactions.results);
|
||||||
|
}
|
||||||
|
} else if (info.update && info.update._ === "updateMessageReactions") {
|
||||||
|
await handleReactionsUpdate(info.update.msgId, getRawPeerId(info.update.peer), info.update.reactions.results);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
dp.onEditMessage(peersConfigFilter(config), async (message) => {
|
||||||
|
if (!message.reactions || !message.reactions.reactions) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
await handleReactionsUpdate(message.id, message.chat.id, message.reactions.raw.results);
|
||||||
|
return PropagationAction.Continue;
|
||||||
|
});
|
||||||
|
|
||||||
|
async function handleReactionsUpdate(messageId: number, peerId: number, reactions: tl.RawReactionCount[]) {
|
||||||
|
const peer: MessageReactionsMap = peers.get(peerId) ?? new Map();
|
||||||
|
const oldReactions = peer.get(messageId);
|
||||||
|
|
||||||
|
const newReactions = new Map<string, number>();
|
||||||
|
for (const r of reactions) {
|
||||||
|
const emoji = getRawReactionEmoji(r.reaction);
|
||||||
|
newReactions.set(emoji.id, r.count);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!oldReactions) {
|
||||||
|
peer.set(messageId, newReactions);
|
||||||
|
peers.set(peerId, peer);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
const allReactions = new Set<string>([
|
||||||
|
...newReactions.keys(),
|
||||||
|
...oldReactions.keys(),
|
||||||
|
]);
|
||||||
|
|
||||||
|
for (const r of allReactions) {
|
||||||
|
const countBefore = oldReactions.get(r) ?? 0;
|
||||||
|
const countAfter = newReactions.get(r) ?? 0;
|
||||||
|
const diff = countAfter - countBefore;
|
||||||
|
|
||||||
|
if (diff > 0) {
|
||||||
|
set.inc({
|
||||||
|
peerId,
|
||||||
|
emoji: getEmojiNameFromId(r),
|
||||||
|
});
|
||||||
|
} else if (diff < 0) {
|
||||||
|
removed.inc({
|
||||||
|
peerId,
|
||||||
|
emoji: getEmojiNameFromId(r),
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
oldReactions.set(r, countAfter);
|
||||||
|
}
|
||||||
|
|
||||||
|
peer.set(messageId, oldReactions);
|
||||||
|
peers.set(peerId, peer);
|
||||||
|
}
|
||||||
|
}
|
||||||
Loading…
Add table
Reference in a new issue