add peers filtering options, live keywords file reload

This commit is contained in:
soffee 2025-04-08 13:09:09 +03:00
parent 29d6c752ce
commit d32ed657a1
8 changed files with 198 additions and 75 deletions

View file

@ -40,6 +40,16 @@ This metric can be enabled with command line flag `--words-counter`
`--keywords-file`, `-k` - path to yaml file with keywords and patterns (see [keywords.yml.example](./keywords.yml.example))
`--watch-file`, `-w` - watch for keywords file updates and reload keywords configuration in runtime
`--include-peers`, `-i` - comma-separated list of `peer.id`s to gather metrics from.
if set, only specified peers will be exposed in metrics.
can be specified multiple times. can not be used along with `--exclude-peers`
`--exclude-peers` `x` - comma-separated list of `peer.id`s to exclude from metrics.
if set, specified peers will not be exposed in metrics.
can be specified multiple times. can not be used along with `--include-peers`
## Environment Variables
`API_ID` - Telegram api id used for mtproto connection (see [mtcute.dev](https://mtcute.dev/guide/intro/sign-in.html))

View file

@ -8,3 +8,10 @@ services:
- .env
volumes:
- ./bot-data:/app/bot-data
- ./keywords.yml:/app/keywords.yml
command:
- "--keywords-file"
- "/app/keywords.yml"
- "--watch-file"
ports:
- 127.0.0.1:9669:9669

View file

@ -1,7 +1,7 @@
{
"name": "mtproto_exporter",
"type": "module",
"version": "1.0.1",
"version": "1.1.0",
"packageManager": "pnpm@10.6.5",
"license": "MIT",
"scripts": {

78
src/config.ts Normal file
View file

@ -0,0 +1,78 @@
import type { OptionDefinition } from "command-line-args";
import type { KeywordLike } from "./keywords.js";
import { readFile } from "node:fs/promises";
import cmdline from "command-line-args";
import yaml from "js-yaml";
export interface Configuration {
bindHost: string;
port: number;
wordsCounter: boolean;
keywordsFile: string;
watchFile: boolean;
includePeers?: number[];
excludePeers?: number[];
keywords?: KeywordLike[];
}
const optionDefinitions: OptionDefinition[] = [
{ name: "bind-host", alias: "b", type: String, defaultValue: "0.0.0.0" },
{ name: "port", alias: "p", type: Number, defaultValue: 9669 },
{ name: "words-counter", type: Boolean, defaultValue: false },
{ name: "keywords-file", alias: "k", type: String },
{ name: "watch-file", alias: "w", type: Boolean, defaultValue: false },
{ name: "include-peers", alias: "i", type: String, multiple: true },
{ name: "exclude-peers", alias: "x", type: String, multiple: true },
];
const cli = cmdline(optionDefinitions);
const config: Configuration = {
bindHost: cli["bind-host"],
port: cli.port,
wordsCounter: cli["words-counter"],
keywordsFile: cli["keywords-file"],
watchFile: cli["watch-file"],
keywords: cli["keywords-file"] ? await readKeywords(cli["keywords-file"]) : undefined,
};
if (cli["include-peers"] && cli["exclude-peers"]) {
throw new Error("Conflicting configuration: --include-peers and --exclude-peers can't be both specified at the same time.");
}
if (cli["include-peers"]) {
config.includePeers = [];
for (const o of cli["include-peers"] as string[]) {
config.includePeers.push(...o.split(",").map(i => parseInt(i)).filter(i => !isNaN(i)));
}
}
if (cli["exclude-peers"]) {
config.excludePeers = [];
for (const o of cli["exclude-peers"] as string[]) {
config.excludePeers.push(...o.split(",").map(i => parseInt(i)).filter(i => !isNaN(i)));
}
}
export async function readKeywords(filePath: string): Promise<KeywordLike[]> {
const doc = yaml.load(await readFile(filePath, "utf8")) as { keywords?: any[] };
if (doc.keywords && doc.keywords.constructor.name === "Array") {
const keywords: KeywordLike[] = [];
for (const item of doc.keywords) {
if (typeof item === "string") {
keywords.push(item);
} else if (typeof item === "object" && item.name && item.pattern) {
keywords.push({
name: item.name,
pattern: new RegExp(item.pattern, "gi"),
});
}
}
return keywords;
} else {
throw new Error("Keywords file format error: no 'keywords' property, or not an array.");
}
}
export { config };

21
src/filters.ts Normal file
View file

@ -0,0 +1,21 @@
import { filters } from "@mtcute/dispatcher";
import { Configuration } from "./config.js";
export function peersConfigBoolFilter(conf: Configuration, peerId: number) {
if(conf.excludePeers && conf.excludePeers.indexOf(peerId) !== -1) {
return false;
}
if(conf.includePeers && conf.includePeers.indexOf(peerId) === -1) {
return false;
}
return true;
}
export function peersConfigFilter(conf: Configuration) {
if(conf.excludePeers) {
return filters.not(filters.chatId(conf.excludePeers));
} else if (conf.includePeers) {
return filters.chatId(conf.includePeers);
}
return filters.any;
}

View file

@ -1,6 +1,8 @@
import type { Dispatcher } from "@mtcute/dispatcher";
import { PropagationAction } from "@mtcute/dispatcher";
import { Counter } from "prom-client";
import { peersConfigFilter } from "./filters.js";
import { config } from "./config.js";
interface KeywordPattern {
name: string;
@ -15,7 +17,7 @@ export function newWordsCounter(dp: Dispatcher) {
help: "Number of words in messages since exporter startup",
labelNames: ["peerId", "word"],
});
dp.onNewMessage(async (msg) => {
dp.onNewMessage(peersConfigFilter(config), async (msg) => {
const words = msg.text.toLowerCase().split(" ");
for (const w of words) {
counter.inc({
@ -28,34 +30,47 @@ export function newWordsCounter(dp: Dispatcher) {
return counter;
}
export function newKeywordsCounter(dp: Dispatcher, keywords: KeywordLike[]) {
const counter = new Counter({
name: "messenger_dialog_keywords_count",
help: "Number of keywords found in messages since exporter startup",
labelNames: ["peerId", "keyword"],
});
dp.onNewMessage(async (msg) => {
for (const kw of keywords) {
let count;
let kwname;
if (typeof kw === "string") {
const words = msg.text.toLowerCase().split(" ");
count = words.filter(w => w === kw).length;
kwname = kw;
} else {
count = (msg.text.match(kw.pattern) || []).length;
kwname = kw.name;
}
if (count === 0) {
continue;
}
counter.inc({
peerId: msg.chat.id,
keyword: kwname,
}, count);
}
return PropagationAction.Continue;
});
export class KeywordsCounter extends Counter {
private _dp: Dispatcher;
private _keywords: KeywordLike[];
constructor(dp: Dispatcher, keywords: KeywordLike[] = []) {
super({
name: "messenger_dialog_keywords_count",
help: "Number of keywords found in messages since exporter startup",
labelNames: ["peerId", "keyword"],
});
this._dp = dp;
this._keywords = keywords;
return counter;
dp.onNewMessage(peersConfigFilter(config), async (msg) => {
for (const kw of this._keywords) {
let count;
let kwname;
if (typeof kw === "string") {
const words = msg.text.toLowerCase().split(" ");
count = words.filter(w => w === kw).length;
kwname = kw;
} else {
count = (msg.text.match(kw.pattern) || []).length;
kwname = kw.name;
}
if (count === 0) {
continue;
}
this.inc({
peerId: msg.chat.id,
keyword: kwname,
}, count);
}
return PropagationAction.Continue;
});
}
public get keywords() {
return this._keywords;
}
public setKeywords(keywords: KeywordLike[]) {
this._keywords = keywords;
}
}

View file

@ -1,54 +1,19 @@
import type { OptionDefinition } from "command-line-args";
import type { KeywordLike } from "./keywords.js";
import fs from "node:fs";
import { Dispatcher } from "@mtcute/dispatcher";
import { TelegramClient } from "@mtcute/node";
import cmdline from "command-line-args";
import yaml from "js-yaml";
import { collectDefaultMetrics, Registry } from "prom-client";
import { config, readKeywords } from "./config.js";
import * as env from "./env.js";
import * as metrics from "./metrics.js";
import MetricsServer from "./server.js";
const optionDefinitions: OptionDefinition[] = [
{ name: "bind-host", alias: "b", type: String, defaultValue: "0.0.0.0" },
{ name: "port", alias: "p", type: Number, defaultValue: 9669 },
{ name: "words-counter", type: Boolean, defaultValue: false },
{ name: "keywords-file", alias: "k", type: String },
];
const cli = cmdline(optionDefinitions);
const keywords: KeywordLike[] = [];
if (cli["keywords-file"]) {
if (!fs.existsSync(cli["keywords-file"])) {
throw new Error("--keywords-file set, but file does not exist.");
}
const doc = yaml.load(fs.readFileSync(cli["keywords-file"], "utf8")) as { keywords?: any[] };
if (doc.keywords && doc.keywords.constructor.name === "Array") {
for (const item of doc.keywords) {
if (typeof item === "string") {
keywords.push(item);
} else if (typeof item === "object" && item.name && item.pattern) {
keywords.push({
name: item.name,
pattern: new RegExp(item.pattern, "gi"),
});
}
}
} else {
throw new Error("Keywords file format error: no 'keywords' property, or not an array.");
}
}
const registry = new Registry();
collectDefaultMetrics({ register: registry });
const server = new MetricsServer(registry);
server.listen(cli["bind-host"], cli.port);
server.listen(config.bindHost, config.port);
const tg = new TelegramClient({
apiId: env.API_ID,
@ -70,10 +35,26 @@ registry.registerMetric(metrics.newStaticPeerInfoGauge(tg));
registry.registerMetric(metrics.newUnreadCountGauge(tg));
registry.registerMetric(metrics.newMessagesCounter(dp));
if (keywords.length > 0) {
registry.registerMetric(metrics.newKeywordsCounter(dp, keywords));
if (config.keywords) {
const counter = new metrics.KeywordsCounter(dp, config.keywords);
registry.registerMetric(counter);
if (config.watchFile) {
fs.watchFile(config.keywordsFile, async (curr, prev) => {
if (curr.mtimeMs === prev.mtimeMs) {
return;
}
console.log("[watch-file] Keywords file was updated. Re-reading keywords configuration...");
try {
config.keywords = await readKeywords(config.keywordsFile);
counter.setKeywords(config.keywords);
} catch (e) {
console.error("Failed to read keywords file", config.keywordsFile, e);
}
});
}
}
if (cli["words-counter"]) {
if (config.wordsCounter) {
registry.registerMetric(metrics.newWordsCounter(dp));
}

View file

@ -3,7 +3,9 @@ import type { TelegramClient } from "@mtcute/node";
import { PropagationAction } from "@mtcute/dispatcher";
import { Counter, Gauge } from "prom-client";
import { newKeywordsCounter, newWordsCounter } from "./keywords.js";
import { KeywordsCounter, newWordsCounter } from "./keywords.js";
import { config } from "./config.js";
import { peersConfigBoolFilter, peersConfigFilter } from "./filters.js";
function newMessagesCounter(dp: Dispatcher) {
const counter = new Counter({
@ -11,7 +13,8 @@ function newMessagesCounter(dp: Dispatcher) {
help: "Messages count since exporter startup",
labelNames: ["peerId"],
});
dp.onNewMessage(async (msg) => {
dp.onNewMessage(peersConfigFilter(config), async (msg) => {
counter.inc({
peerId: msg.chat.id,
});
@ -28,6 +31,10 @@ function newStaticPeerInfoGauge(tg: TelegramClient) {
collect: async () => {
gauge.reset();
for await (const d of tg.iterDialogs()) {
if(!peersConfigBoolFilter(config, d.peer.id)) {
continue;
}
gauge.set({
peerId: d.peer.id,
peerType: d.peer.type,
@ -47,6 +54,10 @@ function newUnreadCountGauge(tg: TelegramClient) {
collect: async () => {
gauge.reset();
for await (const d of tg.iterDialogs()) {
if(!peersConfigBoolFilter(config, d.peer.id)) {
continue;
}
gauge.set({
peerId: d.peer.id,
}, d.unreadCount);
@ -57,7 +68,7 @@ function newUnreadCountGauge(tg: TelegramClient) {
}
export {
newKeywordsCounter,
KeywordsCounter,
newMessagesCounter,
newStaticPeerInfoGauge,
newUnreadCountGauge,