数据中继 relay.c 两个设备(两个文件 两个用户 两个server) 数据交换 场景:

  • 输入网址,登录网站 下载资料or提交文档 ;
  • 流式套接字, 发送数据包-ack-下一个数据包
  • 域名拦截,访问baidu.com,广告页面先出现几秒,放大数据中继模型 中间人攻击?
  • 父进程 管理 两万对设备之间进行对话 负载重 fork 多个子进程 负责 一定数量设备对话,父进程管理子进程 子进程放在不同的主机上 负载均衡 滚雪球 模型: 设备l 设备r 方法:
  1. 读左 写右 读右 写左 rl-wr-rr-wl 成为一个任务 一个人完成
  2. 分成两个任务 第一个任务负责 rl-wr ;第二个任务 rr-wl mycpy 重构 : 多个 一起工作 不冲突

c是工具 用它 完成功能 讲的是机制,像字典


该目录下io/adv/nonblock

relay.c

  • 打开的动作,open ;打开方式 非阻塞io 用户打开方式不确定,main函数模拟用户的操作,在main函数之外保证是以非阻塞方式操作,使用fctl;中间调用函数
#include <stdio.h>
#include<stdlib.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <errno.h>
 
#define TTY1 "/dev/tty11"
#define TTY2 "/dev/tty12"
 
#define BUFSIZE 1024
 
enum
{
	STATE_R=1,
	STATE_W,
	STATE_Ex,
	STATE_T
};
struct fsm_st
{
	int state;
	int sfd;
	int dfd;
	char buf[BUFSIZE];
	int len;
	int pos;
	char *errstr;
};
 
 
static void fsm_driver(struct fsm_st *fsm)
{
	int ret;
	switch(fsm->state)
	{
		case STATE_R:
			fsm->len=read(fsm->sfd,fsm->buf,BUFSIZE);
			if(fsm->len==0)
				fsm->state=STATE_T;
			else if(fsm->len <0)
			{
				//真错与假错
				if(errno==EAGAIN)
					fsm->state=STATE_R;
				else
				{
					fsm->errstr="read()";
					fsm->state=STATE_Ex;
					//跳转同时记录出错原因
				}
			}
			else
			{
				fsm->pos=0;
				fsm->state=STATE_W;
			}
			break;
		case STATE_W:
			ret=wirte(fsm->dfd,fsm->buf+fsm->pos,fsm->len);
			if(ret<0)
			{
				if(errno=EAGAIN)
					fsm->state=STATE_W;
				else
				{
					fsm->errstr="write()"
					fsm->state=state_Ex;
				}
			}
			else
			{
				fsm->pos+=ret;
				fsm->len-=ret;
				if(fsm->len==0)
					fsm->state=STATE_R;
				else
					fsm->state=STATE_W;
			}
			break;
		case STATE_Ex:
			//报错
			perror(fsm->errstr);
			fsm->state=STATE_T;
			break;
		case STATE_T:
			//进程结束 但是结束不了 死循环 do sth
			break
		default:
			//do sth 信号
			abort();
			break;
	}
 
}
static void relay(int fd1,int fd2)
{
	struct fsm_st fsm12,fsm21; //读1写2&&读2写1
	int fd1_save,fd2_save;
	//两个文件描述符 以非阻塞打开
	fd1_save=fcntl(fd1,F_GETFL);
	fcntl(fd1,F_SETFL,fd1_save|O_NONBLOK);
	fd2_save=fcntl(fd2,F_GETFL);
	fcntl(fd2,F_SETFL,fd2_save|O_NONBLOCK);
 
	//状态初始化
	fsm12.state=STATE_R;
	fsm12.sfd=fd1;
	fsm12.dfd=fd2;
 
	fsm21.state=STATE_R;
	fsm21.sfd=fd2;
	fsm21.dfd=fd1;
 
	while(fsm12.state!=STATE_T||fsm21.state!=STATE_T)
	{
		fsm_driver(&fsm12);
		fsm_driver(&fsm21);
	}
 
 
	//文件状态恢复
	fcntl(fd1,F_SETFL,fd1_save);
	fcntl(fd2,F_SETFL,fd2_save);
}
 
int main()
{
	int fd1, fd2;
	fd1=open(TTY1,O_RDWR); //用户阻塞打开
 
	if(fd1<0)
	{
		perror("open()");
		exit(1);
	}
	//写提示性内容
	write(fd1,"TTY1\n",5);
 
 
	fd2=open(TTY2,O_RDWR|O_NONBLOCK)  //用户非阻塞打开
	if(fd2<0)
	{
		peeror
		exit
	}
	write(fd2,"TTY2\n",5)
 
	relay(fd1,fd2);
	close(fd2);
	close(fd1)
	exit(0);
}
  • 有限状态机,若改需求,在图上改圈 todo 截图
  • 两个状态机 一个 读左写右 一个读右写左;状态机数据结构的封装 copy 的封装,mycopy的现场
  • 确保进入与出去的状态是一致的, relay()前后 文件打开方式不变 1 阻塞 2 非阻塞。 relay 函数的最后 恢复文件状态 使用fcntl
  • 思路
    • main函数用来模拟用户的操作:打开两个设备,调用数据中继函数
    • relay函数中 在所有实现之前,保证两个文件是以非阻塞方式实现,结束时恢复之前的状态。中间建立两个状态机,初始化状态为读态,分别把源和目标指定好。不停推两个状态机直到T态,死循环。

make relay
root 用户执行 ctl alt f11 f12 一行的内容按ctl+c 放在缓冲区中不发出去


改成 中继 引擎

relayer文件夹

io/adv/nonblock/relayer

调用方 main.c 实现 relay.c makefile

CFLAGS+=-pthread
LDFLAGS+=-pthread
all:relayer
relayer:relayer.o main.o
	gcc $^ -o $@ $(CFLAGS) $(LDFLAGS)
 
clean:
	rm -rf *.o relayer
 

  • 两个文件描述符构成一个Job 。一万个job ,2万个 fd,状态机 12 和状态机21 。ulimit -a 需要先更改 open file 大小

  • 两对的实现 最多管理一万个任务,两万个文件描述符

#ifndef RELAYER_H__
#define RELAYER_H__
 
#define REL_JOBMAX         10000
 
enum
{
	STATE_RUNNING=1,
	STATE_CANCELLED,
	STATE_OVER
};
 
struct rel_state_st
{
	int state;
	int fd1;
	int fd2;
	int64_t coutn12,count21;
	struct timerval start,end;
};
 
 
//往数组里添加任务,返回数组下标
int rel_addjob(int fd1,int fd2);
/*
*  return >=0       成功,返回当前任务ID
*         ==-EINVAL 失败,参数非法
*         ==-ENOSPC 失败,任务数组满
*          ==-ENOMEM 失败,内存分配有误
**/
 
int rel_canceljob(int id);
/*
return == 0      成功,指定任务成功取消
       ==-EINVAL 失败,参数非法
       ==-EBUSY  失败,任务早已被取消
*/
 
//收尸 可以查看状态
int int rel_waitjob(int id,struct rel_sta_st*);
/*
	return == 0      成功,指定任务已终止并返回状态
	       ==-EINVAL 失败,参数非法
*/
 
int rel_stajob(int fd,struct rel_stat_st *)
/*
	return == 0        成功,指定任务状态已经返回
	       ==-EINVAL   失败,参数非法
 
*/
#endif
  • 对于 main.c 不再需要状态机 的定义 driver ,只需要调用方法,保留一个main函数。
#include <stdio.h>
#include<stdlib.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <errno.h>
#include <string.h>
#include <relayer.h>
 
#define TTY1 "/dev/tty11"
 
#define TTY2 "/dev/tty12"
#define TTY3 "/dev/tty10"
#define TTY4 "/dev/tty9"
 
int main()
 
{
 
int fd1, fd2;
int job1,
 
fd1=open(TTY1,O_RDWR); //用户阻塞打开
 
if(fd1<0)
 
{
 
perror("open()");
 
exit(1);
 
}
 
//写提示性内容
 
write(fd1,"TTY1\n",5);
 
fd2=open(TTY2,O_RDWR|O_NONBLOCK) //用户非阻塞打开
 
if(fd2<0)
 
{
 
peeror
 
exit
 
}
 
write(fd2,"TTY2\n",5)
 
job1=rel_addjob(fd1,fd2);
 
if(job1<0)
{
	fprintf(strerr,"rel_addjob():%s\n",strerror(-job1));
	exit(1);
}
 
fd3=open(TTY3,O_RDWR);
if(fd3<0)
{
	perror("open()");
	exit(1);
}
write(fd3,"TTY3\n",5);
 
fd4=open(TTY4,O_RDWR);
if(fd4<0)
{
	perror("open()");
	exit(1);
}
write(fd4,"TTY4\n",5);
 
 
job2=rel_addjob(fd3,fd4);
if(job2<0)
{
	fprintf(stderr,"rel_addjob():%s\n",strerror(-job2));
}
 
while(1)
	pause();
close(fd2);
close(fd1);
close(fd3);
close(fd4);
 
exit(0);
 
}
 
  • addjob的实现。添加任务,会用到当前任务的属性。在当前最大值中找空位。上限暴露给用户。在头文件中
#include <pthread.h>
#include<string.h>
#include<relayer.h>
 
//临界资源来使用,多线程实现
static struct rel_job_st* rel_job[REL_JOBMAX];
static pthread_mutex_t mut_rel_job = PTHREAD_MUTEX_INITIALIZER;
static pthread_once_t init_once=PTHREAD_ONCE_INIT;
 
struct fsm_st
 
{
 
	int state;
	int sfd;
	int dfd;
	char buf[BUFSIZE];
	int len;
	int pos;
	char *errstr;
	int64_t count;
 
};
 
...
 
struct rel_job_st
{
	int job_state;
	int fd1;
	int fd2;
	struct rel_fsm_st fsm12,fsm21;
	//struct timerval start,end;
	int fd1_save,fd2_save
};
 
static void *thr_relayer(void* p)
{
 
	while(1)
	{
	pthread_mutex_lock(&mut_rel_job);
	for(i=0;i<REL_JOBMAX;i++)
	{
		if(rel_job[i]!=NULL)
		{
			if(rel_job[i]->job_state==STATE_RUNNING)
			{
 
				fsm_driver(&rel_job[i]->fsm12);
				fsm_driver(&rel_job[i]->fsm21);
				if(rel_job[i]->fsm12.state==STATE_T&&rel_job[i]->fsm21.state==STATE_T)
				rel_job[i]->job_state= STATE_OVER;
			}
		}
	}
	pthread_mutex_unlock(&mut_rel_job);
	}
}
 
//module_unload
 
static void module_load(void)
{
	//创建一个线程,永远推状态机,类似构造函数
	pthread_t tid_relayer;
err=pthread_create(&tid_relayer,NULL,thr_relayer,NULL
	if(err)
	{
		fprintf(stderr,"pthread_create():%s\n",strerror(err));
		exit(1);
	}
}
 
static get_free_pos_unlocked()
{
	for(i=0;i<REL_JOBMAX;i++)
	{
		if(rel_job[i]==NULL)
			return i;
	}
	//没有空位
	return -1;
}
 
 
int rel_addjob(int fd1,int fd2)
{
	struct rel_job_st*me;
 
	pthread_once(&init_once,module_load);//动态模块单次初始化加载
 
	me=malloc(sizeof(*me));
	if(me == NULL)
		return -ENOMEM;
	//成员初始化
	me->fd1=fd1;
	me->fd2=fd2;
	me->job_state=STATE_RUNNING;
 
	//保证非阻塞
	me->fd1_save=fcntl(me->fd1,F_GETFL);
	fcntl(me->fd1_save,F_SETFL,me->fd1_save|O_NONBLOCK);
	me->fd2_save=fcntl(me->fd2,F_GETFL);
	fcntl(me->fd2_save,F_SETFL,me->fd2_save|O_NONBLOCK);
 
	me->fsm12.sfd=me->fd1;
	me->fsm12.dfd=me->fd2;
	me->fsm12.state=STATE_R;
 
	me->fsm21.sfd=me->fd2;
	me->fsm21.dfd=me->fd1;
	me->fsm12.state=STATE_R;
 
	//找空位;
 
 
	pthread_mutex_lock(&mut_rel_job);
	pos=get_free_pos_unlocked();
	if(pos<0)
	{
		pthread_mutex_unlock(&mut_rel_job);
		//各种恢复
		fcntl(me->fd1,F_SETFL,me->fd1_save);
		fcntl(me->fd2.F_SETFL,me->fd2_save);
		free(me);
		return -ENOSPC;
	}
 
	rel_job[pos]=me;
 
	pthread_mutex_unlock(&mut_rel_job);
}
 
int rel_canclejob
 
 
  • module_load

io密集型任务,非重负载; 忙等