[flask]集成Prometheus

前言

在服务监控中,大致可分为日志监控和指标监控。日志监控一般由类似ELK这样的日志系统去收集分析,而指标监控一般是由Prometheus收集服务的一些可量化的指标数据,比如服务响应码、响应时间。

安装sdk

python -m pip install prometheus-client

示例

示例1

使用 prometheus_client自带的make_wsgi_app直接绑定到flask实例上,以下client使用的registry是自带默认的REGISTRY。

from flask import Flask
from werkzeug.middleware.dispatcher import DispatcherMiddleware
from prometheus_client import make_wsgi_app, Info

i = Info("my_build_version", "Description of info")
i.info({"version": "1.2.3", "buildhost": "foo@bar"})


app = Flask(__name__)

app.wsgi_app = DispatcherMiddleware(app.wsgi_app, {"/metrics": make_wsgi_app()})


@app.get("/")
def hello():
    return "Hello World!"


if __name__ == "__main__":
    app.run(debug=False)

访问http://127.0.0.1:5000/metrics可以看到类似以下的输出,其中包含了自定义的metric my_build_version_info

# HELP python_gc_objects_collected_total Objects collected during gc
# TYPE python_gc_objects_collected_total counter
python_gc_objects_collected_total{generation="0"} 527.0
python_gc_objects_collected_total{generation="1"} 124.0
python_gc_objects_collected_total{generation="2"} 0.0
# HELP python_gc_objects_uncollectable_total Uncollectable objects found during GC
# TYPE python_gc_objects_uncollectable_total counter
python_gc_objects_uncollectable_total{generation="0"} 0.0
python_gc_objects_uncollectable_total{generation="1"} 0.0
python_gc_objects_uncollectable_total{generation="2"} 0.0
# HELP python_gc_collections_total Number of times this generation was collected
# TYPE python_gc_collections_total counter
python_gc_collections_total{generation="0"} 112.0
python_gc_collections_total{generation="1"} 10.0
python_gc_collections_total{generation="2"} 0.0
# HELP python_info Python platform information
# TYPE python_info gauge
python_info{implementation="CPython",major="3",minor="11",patchlevel="2",version="3.11.2"} 1.0
# HELP process_virtual_memory_bytes Virtual memory size in bytes.
# TYPE process_virtual_memory_bytes gauge
process_virtual_memory_bytes 5.11332352e+08
# HELP process_resident_memory_bytes Resident memory size in bytes.
# TYPE process_resident_memory_bytes gauge
process_resident_memory_bytes 4.4941312e+07
# HELP process_start_time_seconds Start time of the process since unix epoch in seconds.
# TYPE process_start_time_seconds gauge
process_start_time_seconds 1.74637360939e+09
# HELP process_cpu_seconds_total Total user and system CPU time spent in seconds.
# TYPE process_cpu_seconds_total counter
process_cpu_seconds_total 0.25
# HELP process_open_fds Number of open file descriptors.
# TYPE process_open_fds gauge
process_open_fds 7.0
# HELP process_max_fds Maximum number of open file descriptors.
# TYPE process_max_fds gauge
process_max_fds 1.048576e+06
# HELP my_build_version_info Description of info
# TYPE my_build_version_info gauge
my_build_version_info{buildhost="foo@bar",version="1.2.3"} 1.0

如果不想要自带输出的这些gc信息,或者想要手动注册,也可以自定义registry,

from flask import Flask
from werkzeug.middleware.dispatcher import DispatcherMiddleware
from prometheus_client import make_wsgi_app, Info, CollectorRegistry, PROCESS_COLLECTOR, GC_COLLECTOR, PLATFORM_COLLECTOR

registry = CollectorRegistry(auto_describe=True)
registry.register(PROCESS_COLLECTOR)
registry.register(GC_COLLECTOR)
registry.register(PLATFORM_COLLECTOR)

i = Info("my_build_version", "Description of info", registry=registry)
i.info({"version": "1.2.3", "buildhost": "foo@bar"})


app = Flask(__name__)

app.wsgi_app = DispatcherMiddleware(app.wsgi_app, {"/metrics": make_wsgi_app(registry=registry)})


@app.get("/")
def hello():
    return "Hello World!"


if __name__ == "__main__":
    app.run(debug=False)

示例2

.
├── flaskapp.py
├── pkg
│   ├── __init__.py
│   └── metrics
│       ├── __init__.py
│       └── metrics.py
└── router
    ├── __init__.py
    └── metrics.py
  • flaskapp.py
from flask import Flask, request, Response
from pkg.metrics import request_counter
from router import metrics
from random import randint
import json

app = Flask(__name__)


@app.get("/")
def hello():
    status_list = [101, 200, 401, 403, 404, 499, 500, 503, 504]
    request_counter.labels(
        method=request.method,
        path=request.path,
        status=status_list[randint(0, len(status_list) - 1)],
    ).inc()
    return {"message": "Hello World!"}

@app.post("/webhook")
def jsontest():
    # 获取消息头
    print(request.headers)

        # 接收数据并转成字典
    data = request.get_data()
    # data = json.loads(data)
    print(data)

    # 响应json格式数据
    return Response(status=200)

app.register_blueprint(metrics.bp)

if __name__ == "__main__":
    app.run(host="0.0.0.0", port=8000)
  • pkg/metrics/__init__.py
from .metrics import registry, request_counter

__all__ = ["registry", "request_counter"]
  • pkg/metrics/metrics.py
from prometheus_client import Counter, CollectorRegistry, Info
import socket
from functools import lru_cache
import sys

registry = CollectorRegistry(auto_describe=True)

request_counter = Counter(
    "http_requests_total",
    "Total HTTP Requests",
    ["method", "path", "status"],
    registry=registry,
)

@lru_cache(maxsize=128)
def get_selfip():
    try:
        s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
        s.connect(('8.8.8.8', 53))
        return s.getsockname()[0]
    except Exception:
        return "127.0.0.1"

@lru_cache(maxsize=None)
def get_pyversion():
    return sys.version.split(" ")[0]

app_info = Info(
    "app_info",
    "Application info",
    registry=registry,
)

app_info.info({
    "version": "1.0.0",
    "host": get_selfip(),
    "python_version": get_pyversion(),
})
  • router/__init__.py内容为空
  • router/metrics.py
from flask import Blueprint, make_response
from pkg.metrics import registry
from prometheus_client import generate_latest

bp = Blueprint("metrics", __name__, url_prefix="/metrics")

@bp.get("/prometheus")
def get_metrics():
    """
    获取 Prometheus 的指标数据
    """
    resp = make_response(generate_latest(registry))
    resp.headers["Content-Type"] = "text/plain"
    return resp

示例3-在中间件中采集http请求

from flask import Flask
from werkzeug.middleware.dispatcher import DispatcherMiddleware
from prometheus_client import (
    Counter,
    Histogram,
    make_wsgi_app,
    Info,
)
import time
from random import random, randint


class Metrics:
    def __init__(self):
        self.app_info = Info("python_service_basic_info", "Description of info")
        self.request_counter = Counter(
            "http_request_total", "Total HTTP requests", ["method", "path", "status"]
        )
        self.response_time = Histogram(
            "http_response_time",
            "HTTP response time",
            ["method", "path"],
        )
        self._initialize()

    def _initialize(self):
        self.app_info.info(
            {
                "version": self._get_app_info_version(),
                "name": "myapp",
            }
        )

    def _get_app_info_version(self) -> str:
        return "0.1.0"


collector = Metrics()


class MetricsMiddleware:
    def __init__(self, app):
        self.app = app
        self.white_list = frozenset(
            [
                "/metrics",
                "/health",
            ]
        )

    def __call__(self, environ, start_response):
        method = environ.get("REQUEST_METHOD", "NaN")
        path = environ.get("PATH_INFO", "NaN")
        resp_status_code = None

        def catching_start_response(status, headers, exc_info=None):
            nonlocal resp_status_code
            resp_status_code = status.split(" ")[0]
            return start_response(status, headers, exc_info)

        start_time = time.time()
        response = self.app(environ, catching_start_response)
        response_time = round(time.time() - start_time, 4)

        if path not in self.white_list:
            collector.request_counter.labels(method, path, resp_status_code).inc()
            collector.response_time.labels(method, path).observe(response_time)

        return response


app = Flask(__name__)
app.wsgi_app = DispatcherMiddleware(app.wsgi_app, {"/metrics": make_wsgi_app()})
app.wsgi_app = MetricsMiddleware(app.wsgi_app)


@app.get("/a1")
def a1():
    return "a1"


@app.get("/a2")
def a2():
    time.sleep(random())
    return "a2"


@app.get("/a3")
def a3():
    time.sleep(randint(1, 3))
    return "a3"


@app.get("/a4")
def a4():
    time.sleep(randint(1, 3))
    raise Exception("a4")


if __name__ == "__main__":
    app.run(host="127.0.0.1", port=5000, debug=False)

prometheus配置

scrape_configs:
  # The job name is added as a label `job=<job_name>` to any timeseries scraped from this config.
  - job_name: "python"
    metrics_path: '/metrics/prometheus'
    file_sd_configs:
    - files: ['sd_configs/python/*.yaml']
      refresh_interval:  10s

python/target.yaml

- targets: ['192.168.1.112:8000']
  labels:
    instance: 192.168.1.112

参考

  • prometheus client-python 官方文档
  • Prometheus监控的4个黄金指标及示例

来源链接:https://www.cnblogs.com/XY-Heruo/p/18860646

© 版权声明
THE END
支持一下吧
点赞13 分享
评论 抢沙发
头像
请文明发言!
提交
头像

昵称

取消
昵称表情代码快捷回复

    暂无评论内容