同步
同步是在一個函數(shù)體內(nèi),
read,業(yè)務(wù)處理,等待IO可寫,write是阻塞的可能返回-1;
異步
異步不好理解,流程不清晰:
在一個函數(shù)體內(nèi),
read,業(yè)務(wù)處理,要進行write就直接注冊寫事件(epoll_ctl),在回調(diào)函數(shù)中epoll_wait來處理事件。(主從reactor?)
https 異步請求:
client :發(fā)送http請求,不用等待http的reponse返回,把等待事件加到epoll(進行epoll_wait),客戶端直接進行下一個http請求的發(fā)送
server :讀完fd之后,把事件加入epoll中,判斷是否可寫,如果可寫,就進行寫操作,不可行進行下一個read
async_read(fd, );
if (poll(fd))//如果有就緒
read;
else//如果沒有就緒
epoll ctl(epfd, fd);
async_recv_from :
判斷是否可讀
可讀:recv_from()
不可讀:加入到epoll中,就返回(還需要一個callback(){epoll_wait})
協(xié)程
有異步的性能,同步的編程方式(發(fā)起請求,等待結(jié)果)
在一個函數(shù)內(nèi)部commit
發(fā)送請求
等待結(jié)果
協(xié)程–> ntyco, libco:都是基于epoll做io調(diào)度。
1.為什么會要協(xié)程?
同步的編程方式,實行異步
2.異步的運行流程是什么
3.協(xié)程的原語操作
resume恢復(fù)
yield讓出
switch()實現(xiàn)方式;
- longjmp/setjmp
- ucontext;
- 匯編實現(xiàn)
switch()協(xié)程切換:
1、協(xié)程切換只涉及基本的CPU上下文切換。
當前協(xié)程的 CPU 寄存器狀態(tài)保存起來,然后將需要切換進來的協(xié)程的 CPU 寄存器狀態(tài)加載的 CPU 寄存器上就 ok 了。
2、而且完全在用戶態(tài)進行,
4.協(xié)程的定義?
5.調(diào)度器的定義?
管理所有協(xié)程
6.調(diào)度的策略?
如果兩個時間一樣,紅黑樹不支持同一個k有兩個v,所以當插入的時候,我們先看紅黑樹是否有,有的話,可以+1微妙再插 入;
7. api封裝, hook
Hook函數(shù)又可以叫做鉤子函數(shù),其本質(zhì)就是一種函數(shù)劫持調(diào)用。簡單來說,就是在用戶調(diào)用系統(tǒng)函數(shù)的過程中,劫持用戶的調(diào)用請求并注入一些自定義代碼,轉(zhuǎn)而再去調(diào)用目標系統(tǒng)函數(shù)。
也就是說,我們通過自定義系統(tǒng)同名函數(shù),劫持到用戶的調(diào)用,然后去執(zhí)行自定義的操作。
通過Hook技術(shù)可以在調(diào)用方無感知的情況下進行了自定義的操作。
例如read函數(shù),它的功能是從套接字中讀取一定字節(jié)的數(shù)據(jù),它的原型如下:
#include <unistd.h>
ssize_t read(int fd, void *buf, size_t count);
然而read往往不能滿足我們對網(wǎng)絡(luò)性能的需求,因為它是同步的,也就是說,當調(diào)用read之后,我們就只能等待期返回結(jié)果,在這期間不能做任何事情了。
有沒有方法hook系統(tǒng)的read函數(shù),實現(xiàn)自定義的異步read呢。當然可以,結(jié)合協(xié)程的思想,我們可以這樣設(shè)計hook的read函數(shù):
當不滿足可讀條件時,先Yield當前協(xié)程,讓出CPU。
當read條件就緒時,主協(xié)程喚醒read協(xié)程,read協(xié)程再去調(diào)用系統(tǒng)本身的read函數(shù)。
注意到這里有一點,主協(xié)程怎么知道IO條件是否就緒呢。幸運的是,Linux上有個東西可以幫助我們監(jiān)聽套接字的讀寫條件。這當然就是Epoll了(當然poll、select也可以)。
因此自定義的read可以這樣實現(xiàn):
dlsym把read函數(shù)劫持,然后執(zhí)行read_f才是執(zhí)行內(nèi)核的read這個函數(shù)
ssize_t read(fd, buf, count) {
1. 注冊 fd 的可讀事件到 epoll上
2. yield當前協(xié)程
3. 等待yield返回后,說明該協(xié)程被喚醒,那么可讀條件一定滿足。直接調(diào)用系統(tǒng) read_f 函數(shù)即可。
}
整個hook過程中程序執(zhí)行流如下圖所示:
8.多核的模式?
9.協(xié)程的性能?
性能和非阻塞epoll差不多,只是業(yè)務(wù)邏輯按同步來寫,好排查、好寫
加入?yún)f(xié)程IO性能提高,只是因為異步性能高
管理io情況:協(xié)程(有上下文切換)< 非阻塞reactor/epoll
性能測試:
1.并發(fā)量,fd數(shù)量協(xié)程的數(shù)量
2.每秒接入量, fd --> coroutine create
3.斷開連接,coroutine destroy
4. 1G數(shù)據(jù), iperf
10.要有哪些案例?
nty_server
/*
* Author : WangBoJing , email : 1989wangbojing@gmail.com
*
* Copyright Statement:
* --------------------
* This software is protected by Copyright and the information contained
* herein is confidential. The software may not be copied and the information
* contained herein may not be used or disclosed except with the written
* permission of Author. (C) 2017
*
*
**** ***** *****
*** * ** ***
*** * * * **
* ** * * ** **
* ** * * ** *
* ** * ** ** *
* ** * *** **
* ** * *********** ***** ***** ** ****
* ** * ** ** ** ** ** **
* ** * ** ** * ** * **
* ** * ** * * ** ** **
* ** * ** ** * ** * **
* ** * ** * * ** ** **
* ** * ** ** * ** ** **
* ** * ** ** * ** ** **
* ** * ** * * ** ** **
* ** * ** ** * ** * ** **
* *** ** * * ** * ** **
* *** ** * ** * * ** **
* ** ** * ** ** * ** **
* ** ** * * ** * ** **
***** * **** * ***** ****
*
*
*****
****
*
*/
#include "nty_coroutine.h"
#include <arpa/inet.h>
#define MAX_CLIENT_NUM 1000000
#define TIME_SUB_MS(tv1, tv2) ((tv1.tv_sec - tv2.tv_sec) * 1000 + (tv1.tv_usec - tv2.tv_usec) / 1000)
void server_reader(void *arg) {
int fd = *(int *)arg;
int ret = 0;
struct pollfd fds;
fds.fd = fd;
fds.events = POLLIN;
while (1) {
char buf[1024] = {0};
ret = nty_recv(fd, buf, 1024, 0);
if (ret > 0) {
if(fd > MAX_CLIENT_NUM)
printf("read from server: %.*s\n", ret, buf);
ret = nty_send(fd, buf, strlen(buf), 0);
if (ret == -1) {
nty_close(fd);
break;
}
} else if (ret == 0) {
nty_close(fd);
break;
}
}
}
void server(void *arg) {
unsigned short port = *(unsigned short *)arg;
free(arg);
int fd = nty_socket(AF_INET, SOCK_STREAM, 0);
if (fd < 0) return ;
struct sockaddr_in local, remote;
local.sin_family = AF_INET;
local.sin_port = htons(port);
local.sin_addr.s_addr = INADDR_ANY;
bind(fd, (struct sockaddr*)&local, sizeof(struct sockaddr_in));
listen(fd, 20);
printf("listen port : %d\n", port);
struct timeval tv_begin;
gettimeofday(&tv_begin, NULL);
while (1) {
socklen_t len = sizeof(struct sockaddr_in);
int cli_fd = nty_accept(fd, (struct sockaddr*)&remote, &len);
if (cli_fd % 1000 == 999) {
struct timeval tv_cur;
memcpy(&tv_cur, &tv_begin, sizeof(struct timeval));
gettimeofday(&tv_begin, NULL);
int time_used = TIME_SUB_MS(tv_begin, tv_cur);
printf("client fd : %d, time_used: %d\n", cli_fd, time_used);
}
printf("new client comming\n");
nty_coroutine *read_co;
nty_coroutine_create(&read_co, server_reader, &cli_fd);
}
}
int main(int argc, char *argv[]) {
nty_coroutine *co = NULL;
int i = 0;
unsigned short base_port = 9096;
for (i = 0;i < 100;i ++) {
unsigned short *port = calloc(1, sizeof(unsigned short));
*port = base_port + i;
nty_coroutine_create(&co, server, port); no run
}
nty_schedule_run(); //run
return 0;
}
調(diào)度器單線程
nty_ mysql_client.c
#include "nty_coroutine.h"
#include <stdio.h>
#include <string.h>
#include <mysql.h>
void func (void *arg) {
MYSQL* m_mysql = mysql_init(NULL);
if (!m_mysql) {
printf("mysql_init failed\n");
return ;
}
if (!mysql_real_connect(m_mysql,
"192.168.233.133", "abc", "123456",
"KING_DB", 3306,
NULL, CLIENT_FOUND_ROWS)) {
printf("mysql_real_connect failed: %s\n", mysql_error(m_mysql));
return ;
} else{
printf("mysql_real_connect success\n");
}
}
int main() {
#if 1
init_hook();
nty_coroutine *co = NULL;
nty_coroutine_create(&co, func, NULL);
nty_schedule_run(); //run
#else
func(NULL);
#endif
}
文章來源:http://www.zghlxwxcb.cn/news/detail-650894.html
協(xié)程使用:不用考慮用戶層的協(xié)議,我們只用把io改為異步的;(放大象到冰箱,打開,放,關(guān))文章來源地址http://www.zghlxwxcb.cn/news/detail-650894.html
nty_ mysql oper.c
#include "nty_coroutine.h"
#include <mysql.h>
#define KING_DB_SERVER_IP "192.168.233.133"
#define KING_DB_SERVER_PORT 3306
#define KING_DB_USERNAME "king"
#define KING_DB_PASSWORD "123456"
#define KING_DB_DEFAULTDB "KING_DB"
#define SQL_INSERT_TBL_USER "INSERT TBL_USER(U_NAME, U_GENDER) VALUES('King', 'man');"
#define SQL_SELECT_TBL_USER "SELECT * FROM TBL_USER;"
#define SQL_DELETE_TBL_USER "CALL PROC_DELETE_USER('King')"
#define SQL_INSERT_IMG_USER "INSERT TBL_USER(U_NAME, U_GENDER, U_IMG) VALUES('King', 'man', ?);"
#define SQL_SELECT_IMG_USER "SELECT U_IMG FROM TBL_USER WHERE U_NAME='King';"
#define FILE_IMAGE_LENGTH (64*1024)
// C U R D -->
//
int king_mysql_select(MYSQL *handle) { //
// mysql_real_query --> sql
if (mysql_real_query(handle, SQL_SELECT_TBL_USER, strlen(SQL_SELECT_TBL_USER))) {
printf("mysql_real_query : %s\n", mysql_error(handle));
return -1;
}
// store -->
MYSQL_RES *res = mysql_store_result(handle);
if (res == NULL) {
printf("mysql_store_result : %s\n", mysql_error(handle));
return -2;
}
// rows / fields
int rows = mysql_num_rows(res);
printf("rows: %d\n", rows);
int fields = mysql_num_fields(res);
printf("fields: %d\n", fields);
// fetch
MYSQL_ROW row;
while ((row = mysql_fetch_row(res))) {
int i = 0;
for (i = 0;i < fields;i ++) {
printf("%s\t", row[i]);
}
printf("\n");
}
mysql_free_result(res);
return 0;
}
// filename : path + file name
// buffer : store image data
int read_image(char *filename, char *buffer) {
if (filename == NULL || buffer == NULL) return -1;
FILE *fp = fopen(filename, "rb"); //
if (fp == NULL) {
printf("fopen failed\n");
return -2;
}
// file size
fseek(fp, 0, SEEK_END);
int length = ftell(fp); // file size
fseek(fp, 0, SEEK_SET);
int size = fread(buffer, 1, length, fp);
if (size != length) {
printf("fread failed: %d\n", size);
return -3;
}
fclose(fp);
return size;
}
// filename :
// buffer :
// length :
int write_image(char *filename, char *buffer, int length) {
if (filename == NULL || buffer == NULL || length <= 0) return -1;
FILE *fp = fopen(filename, "wb+"); //
if (fp == NULL) {
printf("fopen failed\n");
return -2;
}
int size = fwrite(buffer, 1, length, fp);
if (size != length) {
printf("fwrite failed: %d\n", size);
return -3;
}
fclose(fp);
return size;
}
int mysql_write(MYSQL *handle, char *buffer, int length) {
if (handle == NULL || buffer == NULL || length <= 0) return -1;
MYSQL_STMT *stmt = mysql_stmt_init(handle);
int ret = mysql_stmt_prepare(stmt, SQL_INSERT_IMG_USER, strlen(SQL_INSERT_IMG_USER));
if (ret) {
printf("mysql_stmt_prepare : %s\n", mysql_error(handle));
return -2;
}
MYSQL_BIND param = {0};
param.buffer_type = MYSQL_TYPE_LONG_BLOB;
param.buffer = NULL;
param.is_null = 0;
param.length = NULL;
ret = mysql_stmt_bind_param(stmt, ¶m);
if (ret) {
printf("mysql_stmt_bind_param : %s\n", mysql_error(handle));
return -3;
}
ret = mysql_stmt_send_long_data(stmt, 0, buffer, length);
if (ret) {
printf("mysql_stmt_send_long_data : %s\n", mysql_error(handle));
return -4;
}
ret = mysql_stmt_execute(stmt);
if (ret) {
printf("mysql_stmt_execute : %s\n", mysql_error(handle));
return -5;
}
ret = mysql_stmt_close(stmt);
if (ret) {
printf("mysql_stmt_close : %s\n", mysql_error(handle));
return -6;
}
return ret;
}
int mysql_read(MYSQL *handle, char *buffer, int length) {
if (handle == NULL || buffer == NULL || length <= 0) return -1;
MYSQL_STMT *stmt = mysql_stmt_init(handle);
int ret = mysql_stmt_prepare(stmt, SQL_SELECT_IMG_USER, strlen(SQL_SELECT_IMG_USER));
if (ret) {
printf("mysql_stmt_prepare : %s\n", mysql_error(handle));
return -2;
}
MYSQL_BIND result = {0};
result.buffer_type = MYSQL_TYPE_LONG_BLOB;
unsigned long total_length = 0;
result.length = &total_length;
ret = mysql_stmt_bind_result(stmt, &result);
if (ret) {
printf("mysql_stmt_bind_result : %s\n", mysql_error(handle));
return -3;
}
ret = mysql_stmt_execute(stmt);
if (ret) {
printf("mysql_stmt_execute : %s\n", mysql_error(handle));
return -4;
}
ret = mysql_stmt_store_result(stmt);
if (ret) {
printf("mysql_stmt_store_result : %s\n", mysql_error(handle));
return -5;
}
while (1) {
ret = mysql_stmt_fetch(stmt);
if (ret != 0 && ret != MYSQL_DATA_TRUNCATED) break; //
int start = 0;
while (start < (int)total_length) {
result.buffer = buffer + start;
result.buffer_length = 1;
mysql_stmt_fetch_column(stmt, &result, 0, start);
start += result.buffer_length;
}
}
mysql_stmt_close(stmt);
return total_length;
}
void coroutine_func(void *arg) {
MYSQL mysql;
printf("coroutine_func\n");
if (NULL == mysql_init(&mysql)) {
printf("mysql_init : %s\n", mysql_error(&mysql));
return ;
}
if (!mysql_real_connect(&mysql, KING_DB_SERVER_IP, KING_DB_USERNAME, KING_DB_PASSWORD,
KING_DB_DEFAULTDB, KING_DB_SERVER_PORT, NULL, 0)) {
printf("mysql_real_connect : %s\n", mysql_error(&mysql));
goto Exit;
}
// mysql --> insert
printf("case 1 : mysql --> insert \n");
#if 1
if (mysql_real_query(&mysql, SQL_INSERT_TBL_USER, strlen(SQL_INSERT_TBL_USER))) {
printf("mysql_real_query : %s\n", mysql_error(&mysql));
goto Exit;
}
#endif
king_mysql_select(&mysql);
// mysql --> delete
printf("case 2 : mysql --> delete \n");
#if 1
if (mysql_real_query(&mysql, SQL_DELETE_TBL_USER, strlen(SQL_DELETE_TBL_USER))) {
printf("mysql_real_query : %s\n", mysql_error(&mysql));
goto Exit;
}
#endif
king_mysql_select(&mysql);
printf("case 3 : mysql --> read image and write mysql\n");
char buffer[FILE_IMAGE_LENGTH] = {0};
int length = read_image("0voice.jpg", buffer);
if (length < 0) goto Exit;
mysql_write(&mysql, buffer, length); ///
printf("case 4 : mysql --> read mysql and write image\n");
memset(buffer, 0, FILE_IMAGE_LENGTH);
length = mysql_read(&mysql, buffer, FILE_IMAGE_LENGTH);
write_image("a.jpg", buffer, length);
Exit:
mysql_close(&mysql);
return ;
}
int main() {
#if 1
//init_hook();
nty_coroutine *co = NULL;
nty_coroutine_create(&co, coroutine_func, NULL);
nty_schedule_run(); //run
#else
coroutine_func(NULL);
#endif
}
nty_ rediscli.c
#include "nty_coroutine.h"
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <hiredis.h>
void coroutine_func(void *arg) {
unsigned int j, isunix = 0;
redisContext *c;
redisReply *reply;
const char *hostname = "192.168.233.133";
int port = 6379;
struct timeval timeout = { 1, 500000 }; // 1.5 seconds
if (isunix) {
c = redisConnectUnixWithTimeout(hostname, timeout);
} else {
c = redisConnectWithTimeout(hostname, port, timeout);
}
if (c == NULL || c->err) {
if (c) {
printf("Connection error: %s\n", c->errstr);
redisFree(c);
} else {
printf("Connection error: can't allocate redis context\n");
}
exit(1);
}
/* PING server */
reply = redisCommand(c,"PING");
printf("PING: %s\n", reply->str);
freeReplyObject(reply);
/* Set a key */
reply = redisCommand(c,"SET %s %s", "foo", "hello world");
printf("SET: %s\n", reply->str);
freeReplyObject(reply);
/* Set a key using binary safe API */
reply = redisCommand(c,"SET %b %b", "bar", (size_t) 3, "hello", (size_t) 5);
printf("SET (binary API): %s\n", reply->str);
freeReplyObject(reply);
/* Try a GET and two INCR */
reply = redisCommand(c,"GET foo");
printf("GET foo: %s\n", reply->str);
freeReplyObject(reply);
reply = redisCommand(c,"INCR counter");
printf("INCR counter: %lld\n", reply->integer);
freeReplyObject(reply);
/* again ... */
reply = redisCommand(c,"INCR counter");
printf("INCR counter: %lld\n", reply->integer);
freeReplyObject(reply);
/* Create a list of numbers, from 0 to 9 */
reply = redisCommand(c,"DEL mylist");
freeReplyObject(reply);
for (j = 0; j < 10; j++) {
char buf[64];
snprintf(buf,64,"%u",j);
reply = redisCommand(c,"LPUSH mylist element-%s", buf);
freeReplyObject(reply);
}
/* Let's check what we have inside the list */
reply = redisCommand(c,"LRANGE mylist 0 -1");
if (reply->type == REDIS_REPLY_ARRAY) {
for (j = 0; j < reply->elements; j++) {
printf("%u) %s\n", j, reply->element[j]->str);
}
}
freeReplyObject(reply);
/* Disconnects and frees the context */
redisFree(c);
return ;
}
int main(int argc, char **argv) {
#if 1
//init_hook();
nty_coroutine *co = NULL;
nty_coroutine_create(&co, coroutine_func, NULL);
nty_schedule_run(); //run
#else
coroutine_func(NULL);
}
到了這里,關(guān)于同步、異步、協(xié)程的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!