fix lint issues; use polling instead of fs.watchFile for keywords reloading; add error handling in dialogs collector
This commit is contained in:
parent
f6af162ce6
commit
10f430592d
6 changed files with 51 additions and 23 deletions
|
|
@ -1,7 +1,7 @@
|
||||||
{
|
{
|
||||||
"name": "mtproto_exporter",
|
"name": "mtproto_exporter",
|
||||||
"type": "module",
|
"type": "module",
|
||||||
"version": "1.5.0",
|
"version": "1.5.1",
|
||||||
"packageManager": "pnpm@10.6.5",
|
"packageManager": "pnpm@10.6.5",
|
||||||
"license": "MIT",
|
"license": "MIT",
|
||||||
"scripts": {
|
"scripts": {
|
||||||
|
|
|
||||||
|
|
@ -10,6 +10,7 @@ export interface Configuration {
|
||||||
wordsCounter: boolean;
|
wordsCounter: boolean;
|
||||||
keywordsFile: string;
|
keywordsFile: string;
|
||||||
watchFile: boolean;
|
watchFile: boolean;
|
||||||
|
watchFileIntervalSeconds: number;
|
||||||
includePeers?: number[];
|
includePeers?: number[];
|
||||||
excludePeers?: number[];
|
excludePeers?: number[];
|
||||||
keywords?: RawKeywordLike[];
|
keywords?: RawKeywordLike[];
|
||||||
|
|
@ -17,22 +18,25 @@ export interface Configuration {
|
||||||
dialogs: boolean;
|
dialogs: boolean;
|
||||||
messages: boolean;
|
messages: boolean;
|
||||||
reactions: boolean;
|
reactions: boolean;
|
||||||
}
|
};
|
||||||
reactionsCollector: {
|
reactionsCollector: {
|
||||||
loadHistory: boolean,
|
loadHistory: boolean;
|
||||||
loadHistorySize: number,
|
loadHistorySize: number;
|
||||||
},
|
};
|
||||||
messagesCollector: {
|
messagesCollector: {
|
||||||
includeSender: boolean,
|
includeSender: boolean;
|
||||||
},
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/* eslint-disable style/no-multi-spaces, style/key-spacing */
|
||||||
|
|
||||||
const optionDefinitions: OptionDefinition[] = [
|
const optionDefinitions: OptionDefinition[] = [
|
||||||
{ name: "bind-host", alias: "b", type: String, defaultValue: "0.0.0.0" },
|
{ name: "bind-host", alias: "b", type: String, defaultValue: "0.0.0.0" },
|
||||||
{ name: "port", alias: "p", type: Number, defaultValue: 9669 },
|
{ name: "port", alias: "p", type: Number, defaultValue: 9669 },
|
||||||
{ name: "words-counter", type: Boolean, defaultValue: false },
|
{ name: "words-counter", type: Boolean, defaultValue: false },
|
||||||
{ name: "keywords-file", alias: "k", type: String },
|
{ name: "keywords-file", alias: "k", type: String },
|
||||||
{ name: "watch-file", alias: "w", type: Boolean, defaultValue: false },
|
{ name: "watch-file", alias: "w", type: Boolean, defaultValue: false },
|
||||||
|
{ name: "watch-file-interval-seconds", alias: "W", type: Number, defaultValue: 60 },
|
||||||
{ name: "include-peers", alias: "i", type: String, multiple: true },
|
{ name: "include-peers", alias: "i", type: String, multiple: true },
|
||||||
{ name: "exclude-peers", alias: "x", type: String, multiple: true },
|
{ name: "exclude-peers", alias: "x", type: String, multiple: true },
|
||||||
{ name: "reactions-collector", type: Boolean, defaultValue: false },
|
{ name: "reactions-collector", type: Boolean, defaultValue: false },
|
||||||
|
|
@ -43,6 +47,8 @@ const optionDefinitions: OptionDefinition[] = [
|
||||||
{ name: "messages-collector-include-sender", type: Boolean, defaultValue: false },
|
{ name: "messages-collector-include-sender", type: Boolean, defaultValue: false },
|
||||||
];
|
];
|
||||||
|
|
||||||
|
/* eslint-enable style/no-multi-spaces, style/key-spacing */
|
||||||
|
|
||||||
const cli = cmdline(optionDefinitions);
|
const cli = cmdline(optionDefinitions);
|
||||||
|
|
||||||
const config: Configuration = {
|
const config: Configuration = {
|
||||||
|
|
@ -51,6 +57,7 @@ const config: Configuration = {
|
||||||
wordsCounter: cli["words-counter"],
|
wordsCounter: cli["words-counter"],
|
||||||
keywordsFile: cli["keywords-file"],
|
keywordsFile: cli["keywords-file"],
|
||||||
watchFile: cli["watch-file"],
|
watchFile: cli["watch-file"],
|
||||||
|
watchFileIntervalSeconds: cli["watch-file-interval-seconds"],
|
||||||
keywords: cli["keywords-file"] ? await readKeywords(cli["keywords-file"]) : undefined,
|
keywords: cli["keywords-file"] ? await readKeywords(cli["keywords-file"]) : undefined,
|
||||||
collectors: {
|
collectors: {
|
||||||
dialogs: cli["dialogs-collector"],
|
dialogs: cli["dialogs-collector"],
|
||||||
|
|
@ -94,7 +101,7 @@ export async function readKeywords(filePath: string): Promise<RawKeywordLike[]>
|
||||||
keywords.push(item);
|
keywords.push(item);
|
||||||
} else if (typeof item === "object" && typeof item.name === "string") {
|
} else if (typeof item === "object" && typeof item.name === "string") {
|
||||||
if (typeof item.pattern === "string") {
|
if (typeof item.pattern === "string") {
|
||||||
let result = {
|
const result = {
|
||||||
name: item.name,
|
name: item.name,
|
||||||
pattern: item.pattern,
|
pattern: item.pattern,
|
||||||
word: Boolean(item.word ?? false),
|
word: Boolean(item.word ?? false),
|
||||||
|
|
@ -102,7 +109,7 @@ export async function readKeywords(filePath: string): Promise<RawKeywordLike[]>
|
||||||
global: true,
|
global: true,
|
||||||
multi_line: false,
|
multi_line: false,
|
||||||
insensitive: true,
|
insensitive: true,
|
||||||
}
|
},
|
||||||
};
|
};
|
||||||
|
|
||||||
if (typeof item.flags === "object") {
|
if (typeof item.flags === "object") {
|
||||||
|
|
|
||||||
31
src/main.ts
31
src/main.ts
|
|
@ -41,27 +41,44 @@ if (config.collectors.messages) {
|
||||||
}
|
}
|
||||||
|
|
||||||
if (config.collectors.reactions) {
|
if (config.collectors.reactions) {
|
||||||
console.log("[WARN] reactions-collector is enabled, but it is very experimental and almost does not work. i don't recommend enabling it especially for production use.")
|
console.log("[WARN] reactions-collector is enabled, but it is very experimental and almost does not work. i don't recommend enabling it especially for production use.");
|
||||||
metrics.collectReactionsMetrics(tg, dp, registry);
|
metrics.collectReactionsMetrics(tg, dp, registry);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (config.keywords) {
|
if (config.keywords) {
|
||||||
const counter = new metrics.KeywordsCounter(dp, rawToPatterns(config.keywords));
|
const counter = new metrics.KeywordsCounter(dp, rawToPatterns(config.keywords));
|
||||||
|
console.log("[keywords] Initialized keywords counter with", counter.keywords.length, "keywords/patterns.");
|
||||||
|
|
||||||
registry.registerMetric(counter);
|
registry.registerMetric(counter);
|
||||||
|
|
||||||
if (config.watchFile) {
|
if (config.watchFile) {
|
||||||
fs.watchFile(config.keywordsFile, async (curr, prev) => {
|
const reloadConfig = async () => {
|
||||||
if (curr.mtimeMs === prev.mtimeMs) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
console.log("[watch-file] Keywords file was updated. Reloading keywords configuration...");
|
|
||||||
try {
|
try {
|
||||||
config.keywords = await readKeywords(config.keywordsFile);
|
config.keywords = await readKeywords(config.keywordsFile);
|
||||||
counter.setKeywords(rawToPatterns(config.keywords));
|
counter.setKeywords(rawToPatterns(config.keywords));
|
||||||
|
console.log(`Loaded ${counter.keywords.length} keywords/patterns.`);
|
||||||
} catch (e) {
|
} catch (e) {
|
||||||
console.error("Failed to read keywords file", config.keywordsFile, e);
|
console.error("Failed to read keywords file", config.keywordsFile, e);
|
||||||
}
|
}
|
||||||
});
|
};
|
||||||
|
|
||||||
|
let lastMtimeMs = 0;
|
||||||
|
setInterval(async () => {
|
||||||
|
const stat = await fs.promises.stat(config.keywordsFile);
|
||||||
|
if (lastMtimeMs === stat.mtimeMs) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (lastMtimeMs === 0 && stat.mtimeMs !== 0) {
|
||||||
|
lastMtimeMs = stat.mtimeMs;
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
lastMtimeMs = stat.mtimeMs;
|
||||||
|
|
||||||
|
console.log("[watch-file] Keywords file was updated. Reloading keywords configuration...");
|
||||||
|
await reloadConfig();
|
||||||
|
}, config.watchFileIntervalSeconds * 1000);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -50,7 +50,6 @@ export function collectDialogMetrics(tg: TelegramClient, registry: Registry) {
|
||||||
for (const d of await dialogs.get()) {
|
for (const d of await dialogs.get()) {
|
||||||
if (d.peer.type !== "chat") continue;
|
if (d.peer.type !== "chat") continue;
|
||||||
if (d.peer.membersCount === null) continue;
|
if (d.peer.membersCount === null) continue;
|
||||||
|
|
||||||
members.set({
|
members.set({
|
||||||
peerId: d.peer.id,
|
peerId: d.peer.id,
|
||||||
}, d.peer.membersCount);
|
}, d.peer.membersCount);
|
||||||
|
|
@ -103,12 +102,17 @@ class DialogsHolder {
|
||||||
this.isUpdating = true;
|
this.isUpdating = true;
|
||||||
this.dialogs = [];
|
this.dialogs = [];
|
||||||
const end = this.dialogsIterDurationHistogram.startTimer();
|
const end = this.dialogsIterDurationHistogram.startTimer();
|
||||||
for await (const d of this.tg.iterDialogs()) {
|
try {
|
||||||
if (!peersConfigBoolFilter(config, d.peer.id)) {
|
for await (const d of this.tg.iterDialogs()) {
|
||||||
continue;
|
if (!peersConfigBoolFilter(config, d.peer.id)) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
this.dialogs.push(d);
|
||||||
}
|
}
|
||||||
this.dialogs.push(d);
|
} catch (e) {
|
||||||
|
console.error("Failed to iterate over telegram dialogs:", e);
|
||||||
}
|
}
|
||||||
|
|
||||||
this.dialogsIterDurationSummary.observe(end());
|
this.dialogsIterDurationSummary.observe(end());
|
||||||
this.lastUpdate = process.hrtime.bigint();
|
this.lastUpdate = process.hrtime.bigint();
|
||||||
this.isUpdating = false;
|
this.isUpdating = false;
|
||||||
|
|
|
||||||
|
|
@ -15,7 +15,7 @@ export interface RawKeywordPattern {
|
||||||
global: boolean;
|
global: boolean;
|
||||||
multi_line: boolean;
|
multi_line: boolean;
|
||||||
insensitive: boolean;
|
insensitive: boolean;
|
||||||
}
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
export type RawKeywordLike = string | RawKeywordPattern;
|
export type RawKeywordLike = string | RawKeywordPattern;
|
||||||
|
|
|
||||||
|
|
@ -24,7 +24,7 @@ export default class MetricsServer {
|
||||||
|
|
||||||
private async _requestHandler(req: http.IncomingMessage, res: http.ServerResponse) {
|
private async _requestHandler(req: http.IncomingMessage, res: http.ServerResponse) {
|
||||||
const url = new URL(`http://${req.headers.host ?? "localhost"}${req.url}`);
|
const url = new URL(`http://${req.headers.host ?? "localhost"}${req.url}`);
|
||||||
console.log(`[HTTP] ${req.method} - ${url.href} from ${req.socket.remoteAddress}:${req.socket.remotePort}`);
|
console.log(`[HTTP] ${req.method} - ${req.socket.localAddress}:${req.socket.localPort} (${url.href}) from ${req.socket.remoteAddress}:${req.socket.remotePort}`);
|
||||||
|
|
||||||
if (req.method === "GET" && url.pathname === "/metrics") {
|
if (req.method === "GET" && url.pathname === "/metrics") {
|
||||||
try {
|
try {
|
||||||
|
|
|
||||||
Loading…
Add table
Reference in a new issue