SSE客户端C++实现(使用libcurl)

  1、SSE数据包格式

  如下所示,一条SSE消息中可以有一个或多个message,每个message由\n\n分隔,一个message也可以由一个或多个filed组成,每个filed由\n分隔,filed有data、id、event、retry四种。

  data表示消息数据,如”data:value\n”。

  id表示数据包编号,当连接断开重连的时候,客户端应该发送一个 HTTP 头,里面包含一个特殊的Last-Event-ID头信息,值为收到的最后一条数据的id。

  event表示自定义的事件类型。

  retry表示浏览器重新发起连接的间隔时间,当时间间隔到期,客户端应该重新发起连接。

  还可以有仅以冒号开头的行,表示注释。通常服务器每隔一段时间就会向浏览器发送一个注释,保持连接不中断。

  包含一条message的消息:

data: {"username": "bobby", "time": "02:33:48"}

  包含两条message的消息:

data:this is message A

data:this is
data:message B

  包含三条message的消息:

:explan text

data:this is message A

data:this is
data:message B

2、SSE客户端实现

#include <thread>
#include <functional>

size_t dataCallback(void* ptr, size_t size, size_t nmemb, void* param)
{
    if (ptr == NULL || size == 0 || param == NULL)
        return 0;

    int s = size * nmemb;
    std::string strMsg((char*)ptr, s);
    CSSEClient* client = static_cast<CSSEClient*>(param);
    if (client && client->onMessage)
        client->onMessage(strMsg);

    return s;
}

int progressCallback(void* param, curl_off_t dltotal, curl_off_t dlnow, curl_off_t ultotal, curl_off_t ulnow) {
    CSSEClient* client = static_cast<CSSEClient*>(param);
    return client->isExit() ? 1 : 0;  // 返回非0值会中断curl_easy_perform()
}

class CSSEClient
{
public:
    CSSEClient(const std::string& strURL) {
        curl_global_init(CURL_GLOBAL_DEFAULT);
        _strURL = strURL;
        _thread = std::thread(&CSSEClient::run, this);
    }
    virtual ~CSSEClient() {
        _bExit = true;
        _thread.join();
        curl_global_cleanup();
    }
    bool isExit() { return _bExit; }
    std::function<void(const std::string&)> onMessage;
private:
    void run() {
        while (!_bExit) {
            subscribeSSE();
            if (_bExit)
                break;
            std::this_thread::sleep_for(std::chrono::seconds(1));
        }
    }
    void subscribeSSE() {
        CURL* curl = curl_easy_init();
        if (!curl) return;

        struct curl_slist* headers = NULL;
        headers = curl_slist_append(headers, "Accept: text/event-stream");
        headers = curl_slist_append(headers, "Cache-Control: no-cache");
        curl_easy_setopt(curl, CURLOPT_HTTPHEADER, headers);

        curl_easy_setopt(curl, CURLOPT_URL, _strURL.c_str());
        curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, dataCallback);
        curl_easy_setopt(curl, CURLOPT_WRITEDATA, this);
        curl_easy_setopt(curl, CURLOPT_CONNECTTIMEOUT, 10L);
        curl_easy_setopt(curl, CURLOPT_TIMEOUT, 0L);

        curl_easy_setopt(curl, CURLOPT_TCP_KEEPALIVE, 1L); //开启TCP心跳
        curl_easy_setopt(curl, CURLOPT_TCP_KEEPIDLE, 60L); //空闲60秒后探测
        curl_easy_setopt(curl, CURLOPT_TCP_KEEPINTVL, 10L); //探测间隔10秒

        curl_easy_setopt(curl, CURLOPT_NOPROGRESS, 0L);  // 开启传输进度(已下载/上传数据量、总数据量等)回调
        curl_easy_setopt(curl, CURLOPT_XFERINFOFUNCTION, progressCallback); //设置传输进度回调方法,即使没有数据传输该方法也会被调用(通常每秒1次),所以可以在该方法中判断是否需要退出
        curl_easy_setopt(curl, CURLOPT_XFERINFODATA, this); //设置传输回调方法的第一个参数

        CURLcode res = curl_easy_perform(curl);
        if (res == CURLE_ABORTED_BY_CALLBACK) {
            //SSE connection aborted by user;
        }
        else {
            //error
        }

        curl_easy_cleanup(curl);
    }

    std::thread _thread;
    std::string _strURL;
    std::atomic_bool _bExit = false;
};

  比如,客户端订阅用户相关数据的变化:

CSSEClient _sseClient("http://test-api.baidu.com.cn:8080/SSE/subscribe/userID")

  对于需要认证的SSE服务,需添加CURLOPT_USERPWD或OAuth头。

  

 

来源链接:https://www.cnblogs.com/milanleon/p/18995770

© 版权声明
THE END
支持一下吧
点赞6 分享
评论 抢沙发
头像
请文明发言!
提交
头像

昵称

取消
昵称表情代码快捷回复

    暂无评论内容