Compare commits
No commits in common. "c48222d4e53bec932c58bdeca159c7c3822f1381" and "dab53c68571af3552dbfe790ebd0494825dc518a" have entirely different histories.
c48222d4e5
...
dab53c6857
7 changed files with 6 additions and 234 deletions
|
|
@ -1,3 +1,4 @@
|
||||||
|
version: "3.9"
|
||||||
services:
|
services:
|
||||||
bot:
|
bot:
|
||||||
build:
|
build:
|
||||||
|
|
@ -12,6 +13,5 @@ services:
|
||||||
- "--keywords-file"
|
- "--keywords-file"
|
||||||
- "/app/keywords.yml"
|
- "/app/keywords.yml"
|
||||||
- "--watch-file"
|
- "--watch-file"
|
||||||
- "--reactions-collector-load-history"
|
|
||||||
ports:
|
ports:
|
||||||
- 0.0.0.0:9669:9669
|
- 127.0.0.1:9669:9669
|
||||||
|
|
|
||||||
|
|
@ -1,7 +1,7 @@
|
||||||
{
|
{
|
||||||
"name": "mtproto_exporter",
|
"name": "mtproto_exporter",
|
||||||
"type": "module",
|
"type": "module",
|
||||||
"version": "1.4.0",
|
"version": "1.3.0",
|
||||||
"packageManager": "pnpm@10.6.5",
|
"packageManager": "pnpm@10.6.5",
|
||||||
"license": "MIT",
|
"license": "MIT",
|
||||||
"scripts": {
|
"scripts": {
|
||||||
|
|
|
||||||
|
|
@ -13,8 +13,6 @@ export interface Configuration {
|
||||||
includePeers?: number[];
|
includePeers?: number[];
|
||||||
excludePeers?: number[];
|
excludePeers?: number[];
|
||||||
keywords?: RawKeywordLike[];
|
keywords?: RawKeywordLike[];
|
||||||
reactionsCollectorLoadHistory: boolean;
|
|
||||||
reactionsCollectorLoadHistorySize: number;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
const optionDefinitions: OptionDefinition[] = [
|
const optionDefinitions: OptionDefinition[] = [
|
||||||
|
|
@ -25,8 +23,6 @@ const optionDefinitions: OptionDefinition[] = [
|
||||||
{ name: "watch-file", alias: "w", type: Boolean, defaultValue: false },
|
{ name: "watch-file", alias: "w", type: Boolean, defaultValue: false },
|
||||||
{ name: "include-peers", alias: "i", type: String, multiple: true },
|
{ name: "include-peers", alias: "i", type: String, multiple: true },
|
||||||
{ name: "exclude-peers", alias: "x", type: String, multiple: true },
|
{ name: "exclude-peers", alias: "x", type: String, multiple: true },
|
||||||
{ name: "reactions-collector-load-history", type: Boolean, defaultValue: false },
|
|
||||||
{ name: "reactions-collector-load-history-size", type: Number, defaultValue: 1000 },
|
|
||||||
];
|
];
|
||||||
|
|
||||||
const cli = cmdline(optionDefinitions);
|
const cli = cmdline(optionDefinitions);
|
||||||
|
|
@ -38,8 +34,6 @@ const config: Configuration = {
|
||||||
keywordsFile: cli["keywords-file"],
|
keywordsFile: cli["keywords-file"],
|
||||||
watchFile: cli["watch-file"],
|
watchFile: cli["watch-file"],
|
||||||
keywords: cli["keywords-file"] ? await readKeywords(cli["keywords-file"]) : undefined,
|
keywords: cli["keywords-file"] ? await readKeywords(cli["keywords-file"]) : undefined,
|
||||||
reactionsCollectorLoadHistory: cli["reactions-collector-load-history"],
|
|
||||||
reactionsCollectorLoadHistorySize: cli["reactions-collector-load-history-size"],
|
|
||||||
};
|
};
|
||||||
|
|
||||||
if (cli["include-peers"] && cli["exclude-peers"]) {
|
if (cli["include-peers"] && cli["exclude-peers"]) {
|
||||||
|
|
|
||||||
|
|
@ -1,8 +1,8 @@
|
||||||
import fs from "node:fs";
|
import fs from "node:fs";
|
||||||
import { Dispatcher } from "@mtcute/dispatcher";
|
import { Dispatcher } from "@mtcute/dispatcher";
|
||||||
import { TelegramClient } from "@mtcute/node";
|
import { TelegramClient } from "@mtcute/node";
|
||||||
|
|
||||||
import { collectDefaultMetrics, Registry } from "prom-client";
|
import { collectDefaultMetrics, Registry } from "prom-client";
|
||||||
|
|
||||||
import { config, readKeywords } from "./config.js";
|
import { config, readKeywords } from "./config.js";
|
||||||
import * as env from "./env.js";
|
import * as env from "./env.js";
|
||||||
import { rawToPatterns } from "./metrics/keywords.js";
|
import { rawToPatterns } from "./metrics/keywords.js";
|
||||||
|
|
@ -34,7 +34,6 @@ console.log("Logged in as", user.username);
|
||||||
|
|
||||||
metrics.collectDialogMetrics(tg, registry);
|
metrics.collectDialogMetrics(tg, registry);
|
||||||
metrics.collectNewMessageMetrics(dp, registry);
|
metrics.collectNewMessageMetrics(dp, registry);
|
||||||
metrics.collectReactionsMetrics(tg, dp, registry);
|
|
||||||
|
|
||||||
if (config.keywords) {
|
if (config.keywords) {
|
||||||
const counter = new metrics.KeywordsCounter(dp, rawToPatterns(config.keywords));
|
const counter = new metrics.KeywordsCounter(dp, rawToPatterns(config.keywords));
|
||||||
|
|
|
||||||
|
|
@ -3,8 +3,8 @@ import type { Dialog, TelegramClient } from "@mtcute/node";
|
||||||
import type { Registry } from "prom-client";
|
import type { Registry } from "prom-client";
|
||||||
import process from "node:process";
|
import process from "node:process";
|
||||||
import timers from "node:timers/promises";
|
import timers from "node:timers/promises";
|
||||||
import { Gauge, Histogram, Summary } from "prom-client";
|
|
||||||
|
|
||||||
|
import { Gauge, Histogram } from "prom-client";
|
||||||
import { config } from "../config.js";
|
import { config } from "../config.js";
|
||||||
import { peersConfigBoolFilter } from "../filters.js";
|
import { peersConfigBoolFilter } from "../filters.js";
|
||||||
|
|
||||||
|
|
@ -53,7 +53,6 @@ class DialogsHolder {
|
||||||
private ttl: bigint;
|
private ttl: bigint;
|
||||||
|
|
||||||
private dialogsIterDurationHistogram: Histogram;
|
private dialogsIterDurationHistogram: Histogram;
|
||||||
private dialogsIterDurationSummary: Summary;
|
|
||||||
|
|
||||||
constructor(private tg: TelegramClient, ttl: number, private timeout: number, private pollInterval = 10) {
|
constructor(private tg: TelegramClient, ttl: number, private timeout: number, private pollInterval = 10) {
|
||||||
this.ttl = BigInt(ttl) * 1000000n;
|
this.ttl = BigInt(ttl) * 1000000n;
|
||||||
|
|
@ -61,15 +60,10 @@ class DialogsHolder {
|
||||||
name: "telegram_api_dialogs_iter_duration",
|
name: "telegram_api_dialogs_iter_duration",
|
||||||
help: "Duration of iteration over telegram dialogs",
|
help: "Duration of iteration over telegram dialogs",
|
||||||
});
|
});
|
||||||
this.dialogsIterDurationSummary = new Summary({
|
|
||||||
name: "telegram_api_dialogs_iter_duration_summary",
|
|
||||||
help: "Duration of iteration over telegram dialogs",
|
|
||||||
});
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public registerMetrics(registry: Registry) {
|
public registerMetrics(registry: Registry) {
|
||||||
registry.registerMetric(this.dialogsIterDurationHistogram);
|
registry.registerMetric(this.dialogsIterDurationHistogram);
|
||||||
registry.registerMetric(this.dialogsIterDurationSummary);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public async get() {
|
public async get() {
|
||||||
|
|
@ -91,7 +85,7 @@ class DialogsHolder {
|
||||||
}
|
}
|
||||||
this.dialogs.push(d);
|
this.dialogs.push(d);
|
||||||
}
|
}
|
||||||
this.dialogsIterDurationSummary.observe(end());
|
end();
|
||||||
this.lastUpdate = process.hrtime.bigint();
|
this.lastUpdate = process.hrtime.bigint();
|
||||||
this.isUpdating = false;
|
this.isUpdating = false;
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -9,7 +9,6 @@ import { peersConfigFilter } from "../filters.js";
|
||||||
import { collectDialogMetrics } from "./dialogs.js";
|
import { collectDialogMetrics } from "./dialogs.js";
|
||||||
import { KeywordsCounter } from "./keywords.js";
|
import { KeywordsCounter } from "./keywords.js";
|
||||||
import { collectNewMessageMetrics } from "./message.js";
|
import { collectNewMessageMetrics } from "./message.js";
|
||||||
import { collectReactionsMetrics } from "./reactions.js";
|
|
||||||
|
|
||||||
function newWordsCounter(dp: Dispatcher) {
|
function newWordsCounter(dp: Dispatcher) {
|
||||||
const counter = new Counter({
|
const counter = new Counter({
|
||||||
|
|
@ -33,7 +32,6 @@ function newWordsCounter(dp: Dispatcher) {
|
||||||
export {
|
export {
|
||||||
collectDialogMetrics,
|
collectDialogMetrics,
|
||||||
collectNewMessageMetrics,
|
collectNewMessageMetrics,
|
||||||
collectReactionsMetrics,
|
|
||||||
KeywordsCounter,
|
KeywordsCounter,
|
||||||
newWordsCounter,
|
newWordsCounter,
|
||||||
};
|
};
|
||||||
|
|
|
||||||
|
|
@ -1,213 +0,0 @@
|
||||||
import type { Dispatcher } from "@mtcute/dispatcher";
|
|
||||||
import type { TelegramClient, tl } from "@mtcute/node";
|
|
||||||
|
|
||||||
import type { Registry } from "prom-client";
|
|
||||||
import { setTimeout } from "node:timers/promises";
|
|
||||||
import { PropagationAction } from "@mtcute/dispatcher";
|
|
||||||
import { Counter, Gauge } from "prom-client";
|
|
||||||
import { config } from "../config.js";
|
|
||||||
import { peersConfigBoolFilter, peersConfigFilter } from "../filters.js";
|
|
||||||
|
|
||||||
type ReactionsMap = Map<string, number>;
|
|
||||||
type MessageReactionsMap = Map<number, ReactionsMap>;
|
|
||||||
type PeerMessagesMap = Map<number, MessageReactionsMap>;
|
|
||||||
|
|
||||||
function getRawPeerId(peer: tl.TypePeer) {
|
|
||||||
switch (peer._) {
|
|
||||||
case "peerUser": {
|
|
||||||
return peer.userId;
|
|
||||||
}
|
|
||||||
case "peerChat": {
|
|
||||||
return peer.chatId;
|
|
||||||
}
|
|
||||||
case "peerChannel": {
|
|
||||||
return peer.channelId;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
function getRawReactionEmoji(reaction: tl.TypeReaction) {
|
|
||||||
let emojiId: string;
|
|
||||||
let emojiName: string;
|
|
||||||
switch (reaction._) {
|
|
||||||
case "reactionEmoji": {
|
|
||||||
emojiId = reaction.emoticon;
|
|
||||||
emojiName = reaction.emoticon;
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
case "reactionCustomEmoji": {
|
|
||||||
emojiId = `<custom:${reaction.documentId.toString()}>`;
|
|
||||||
emojiName = "<custom>";
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
case "reactionPaid": {
|
|
||||||
emojiId = "<star_paid>";
|
|
||||||
emojiName = "⭐ (Paid)";
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
case "reactionEmpty": {
|
|
||||||
emojiId = "<empty>";
|
|
||||||
emojiName = "<empty>";
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return { id: emojiId, name: emojiName };
|
|
||||||
}
|
|
||||||
|
|
||||||
function getEmojiNameFromId(id: string) {
|
|
||||||
if (id === "<star_paid>") {
|
|
||||||
return "⭐ (Paid)";
|
|
||||||
}
|
|
||||||
if (id.startsWith("<custom:")) {
|
|
||||||
return "<custom>";
|
|
||||||
}
|
|
||||||
return id;
|
|
||||||
}
|
|
||||||
|
|
||||||
export async function collectReactionsMetrics(tg: TelegramClient, dp: Dispatcher, registry: Registry) {
|
|
||||||
const peers: PeerMessagesMap = new Map();
|
|
||||||
|
|
||||||
const set = new Counter({
|
|
||||||
name: "messenger_dialog_reactions_set_count",
|
|
||||||
help: "Reactions set count since exporter startup",
|
|
||||||
labelNames: ["peerId", "emoji"],
|
|
||||||
});
|
|
||||||
|
|
||||||
const removed = new Counter({
|
|
||||||
name: "messenger_dialog_reactions_removed_count",
|
|
||||||
help: "Reactions removed count since exporter startup",
|
|
||||||
labelNames: ["peerId", "emoji"],
|
|
||||||
});
|
|
||||||
|
|
||||||
const peersSize = new Gauge({
|
|
||||||
name: "mtproto_exporter_reactions_collector_peers_cache_size",
|
|
||||||
help: "Size of peers cache map size in reactions collector",
|
|
||||||
collect: () => {
|
|
||||||
peersSize.set(peers.size);
|
|
||||||
},
|
|
||||||
});
|
|
||||||
const messagesSize = new Gauge({
|
|
||||||
name: "mtproto_exporter_reactions_collector_messages_cache_size",
|
|
||||||
help: "Size of messages cache map size in reactions collector",
|
|
||||||
collect: () => {
|
|
||||||
messagesSize.reset();
|
|
||||||
for (const m of peers.values()) {
|
|
||||||
messagesSize.inc(m.size);
|
|
||||||
}
|
|
||||||
},
|
|
||||||
});
|
|
||||||
const reactionsSize = new Gauge({
|
|
||||||
name: "mtproto_exporter_reactions_collector_reactions_cache_size",
|
|
||||||
help: "Size of reactions cache map size in reactions collector",
|
|
||||||
collect: () => {
|
|
||||||
reactionsSize.reset();
|
|
||||||
for (const m of peers.values()) {
|
|
||||||
for (const r of m.values()) {
|
|
||||||
reactionsSize.inc(r.size);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
},
|
|
||||||
});
|
|
||||||
|
|
||||||
registry.registerMetric(set);
|
|
||||||
registry.registerMetric(removed);
|
|
||||||
registry.registerMetric(peersSize);
|
|
||||||
registry.registerMetric(messagesSize);
|
|
||||||
registry.registerMetric(reactionsSize);
|
|
||||||
|
|
||||||
if (config.reactionsCollectorLoadHistory) {
|
|
||||||
console.log("fetching dialogs history into reactions collector cache....");
|
|
||||||
const historyIterOptions = {
|
|
||||||
limit: config.reactionsCollectorLoadHistorySize,
|
|
||||||
};
|
|
||||||
for await (const dialog of tg.iterDialogs()) {
|
|
||||||
console.log("fetching dialog with peer id", dialog.peer.id);
|
|
||||||
if (!peersConfigBoolFilter(config, dialog.peer.id)) {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
for await (const message of tg.iterHistory(dialog.peer.id, historyIterOptions)) {
|
|
||||||
await handleReactionsUpdate(message.id, dialog.peer.id, message.reactions?.raw.results ?? []);
|
|
||||||
}
|
|
||||||
await setTimeout(5000);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// we need to count only new messages
|
|
||||||
// because we don't know true number of reactions before updates
|
|
||||||
dp.onNewMessage((message) => {
|
|
||||||
const messages: MessageReactionsMap = peers.get(message.chat.id) ?? new Map();
|
|
||||||
const reactions: ReactionsMap = messages.get(message.id) ?? new Map();
|
|
||||||
|
|
||||||
reactions.clear();
|
|
||||||
|
|
||||||
messages.set(message.id, reactions);
|
|
||||||
peers.set(message.chat.id, messages);
|
|
||||||
|
|
||||||
return PropagationAction.Continue;
|
|
||||||
});
|
|
||||||
|
|
||||||
tg.onRawUpdate.add(async (info) => {
|
|
||||||
if ("updates" in info) {
|
|
||||||
const updates = info.updates as tl.TypeUpdate[];
|
|
||||||
const reactionsUpdates = updates.filter(u => u._ === "updateMessageReactions");
|
|
||||||
for (const update of reactionsUpdates) {
|
|
||||||
await handleReactionsUpdate(update.msgId, getRawPeerId(update.peer), update.reactions.results);
|
|
||||||
}
|
|
||||||
} else if (info.update && info.update._ === "updateMessageReactions") {
|
|
||||||
await handleReactionsUpdate(info.update.msgId, getRawPeerId(info.update.peer), info.update.reactions.results);
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
dp.onEditMessage(peersConfigFilter(config), async (message) => {
|
|
||||||
if (!message.reactions || !message.reactions.reactions) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
await handleReactionsUpdate(message.id, message.chat.id, message.reactions.raw.results);
|
|
||||||
return PropagationAction.Continue;
|
|
||||||
});
|
|
||||||
|
|
||||||
async function handleReactionsUpdate(messageId: number, peerId: number, reactions: tl.RawReactionCount[]) {
|
|
||||||
const peer: MessageReactionsMap = peers.get(peerId) ?? new Map();
|
|
||||||
const oldReactions = peer.get(messageId);
|
|
||||||
|
|
||||||
const newReactions = new Map<string, number>();
|
|
||||||
for (const r of reactions) {
|
|
||||||
const emoji = getRawReactionEmoji(r.reaction);
|
|
||||||
newReactions.set(emoji.id, r.count);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (!oldReactions) {
|
|
||||||
peer.set(messageId, newReactions);
|
|
||||||
peers.set(peerId, peer);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
const allReactions = new Set<string>([
|
|
||||||
...newReactions.keys(),
|
|
||||||
...oldReactions.keys(),
|
|
||||||
]);
|
|
||||||
|
|
||||||
for (const r of allReactions) {
|
|
||||||
const countBefore = oldReactions.get(r) ?? 0;
|
|
||||||
const countAfter = newReactions.get(r) ?? 0;
|
|
||||||
const diff = countAfter - countBefore;
|
|
||||||
|
|
||||||
if (diff > 0) {
|
|
||||||
set.inc({
|
|
||||||
peerId,
|
|
||||||
emoji: getEmojiNameFromId(r),
|
|
||||||
});
|
|
||||||
} else if (diff < 0) {
|
|
||||||
removed.inc({
|
|
||||||
peerId,
|
|
||||||
emoji: getEmojiNameFromId(r),
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
oldReactions.set(r, countAfter);
|
|
||||||
}
|
|
||||||
|
|
||||||
peer.set(messageId, oldReactions);
|
|
||||||
peers.set(peerId, peer);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Loading…
Add table
Reference in a new issue