C++ 从大数据SPARK框架的DAG引擎,再论有向无环图(DAG)的拓扑排序
  8JXh2nINxLkX 2023年12月12日 22 0

公众号:编程驿站 更多好文等你来

给大学生讲解SPARK时,说spark相比其它的大数据框架,其运行速度更快,是其显著的特点之一。之所以运行速度快,其原因之一因其使用先进的DAG(Directed Acyclic Graph,有向无环图)执行引擎SPARK提供了名为RDD(弹性分布式数据集(Resilient Distributed Dataset)的简称)抽象的数据集。DAG引擎用来保证RDD数据集之间依赖的有序性、可靠性。

不理解DAG具体为何物以及其底层原理,并不妨碍使用SPARK,使用者只需要调用其提供的API,用于分析处理不同领域的数据便可。但是,如果能理解DAG的底层结构,对理解和学习SPARK将会有质的提升。

对高层建筑和底层地基之间的逻辑关系的理解能建立对事物的新认知。

2.DAG

2.1 基本概念

什么是DAG

DAG是图结构中的一种,称为有向无环图。有向说明图中节点之间是有方向的,无环指图中没有环(回路),意味着从任一顶点出发都不可能回到顶点本身。如下图:

1.png

DAG往往用来描述事物之间的依赖关系或工作流中子流程之间的顺序,所以DAG中一定存在入度为0和出度为0的节点。入度为0的节点表示流程的开始,出度为0的节点表示流程的结束。根据工作流的特点,入度为0和出度为0的节点可能不只有一个。

如上图,可以理解为对于整个工作流而言,只有当编号为1的子流程完成后,才可以开始2号和3号子流程,当2号完成后,才能是4号,3号完成后才能545号完成后才能是6号。最终可以用线性结构描述出来。

2.png

这个过程称为DAG的线性化过程,也称为DAG的拓扑排序,这里的排序并不是指大小上的有序,而是指时间上的有序。因有可能子流程间不存在时间上的依赖性,如上图的23以及45节点,不存在相互的依赖,所以DAG的拓扑排序并不只有一种可能。如下图中的所有线性化都认为是合法。

3.png

一旦有了工作流的DAG结构图,在设计工作流进程时,则可以引入并行(并发)方案。如上图的2->43->5进程可以使用多线程或多进程方案,加快工作流的执行速度,这也是SPARkDAG引擎能加快处理速度的底层原理。

因是描述工作流中子流程的顺序,显然整个工作流中不能出现环,环的出现,标志着循坏依赖。如下图,2号工作流依赖1号工作流的完成,4号依赖2号工作流的完成,从传递性上讲,4号也依赖1。从结构图中可以看得出1号又依赖4号 ,这便形成了一个引用循环链,从现实角度和实现角度都是违背常规认知和基本逻辑的。

Tips: 环意味着存在循环依赖,会导致系统死锁。

4.png

所以,在对DAG线性化之前,务必先要检查图中是否存在环。

2.2 环的检查

SPARk为了保证RDD的有序性,在进程初始时也需要检查其中是否存在。下面讲解几种环的检查算法思想。

2.2.1 入度和出度

先检查图中节点之间的连通性,在一个连通分量上,如果边的数量大于或等于节点数,存在至少一个入度和一个出度的所有节点必然会构成一个环。下图左边的结构符合每一个节点都有一个入度和出度;右图中的1-2-4-6中的6号节点有2个入度,一个出度,其它节点都至少有一个入度和出度。如果一个节点只能有一个度,要么是入度,要么是出度。

5.png

连通性的检查可能使用并查集或者Floyd算法,或者直接使用DFS、BFS搜索算法。这里就不过多解释。入度和出度的检查也很简单,只需要构建图时记录一下节点的度数。

2.2.2 检查回边

所谓回边,指从一个节点出发,然后又能回到此节点的边。如下图,从1号节点开始搜索,经过如下图中的3->16->1又回到1号节点,称3->16->1为回边。

6.png

如果能证明回边的存在,则可以证明图结构中有环。回边的检查可以直接使用DFS搜索算法,其间有两个小技巧性。

搜索某一个节点时,检查节点的祖先节点是否和某一个子节点重合。如上图中,从1号节点(祖先节点)开始搜索,当搜索到6号节点时,发现1号子节点即是6号节点祖先节点又是子节点,显然6->1就是回边。

实现逻辑较简单,标记每一个访问过的节点,当从一个节点访问其子节点时,如果子节点已经被访问且不是直接父节点,可以断定回边的存在。

#include <bits/stdc++.h>
using namespace std;
//图
int graph[100][100];
//是否访问过
int vis[100];
//节点的父节点
int parent[100];
int INF=999;
//节点数、边数
int n,m;
//初始化图,自己和自己的距离为0,和其它节点距离为 INF
void init() {
	for(int i=1; i<=n; i++) {
		for(int j=1; j<=n; j++) {
			if(i==j)graph[i][j]=0;
			else graph[i][j]=INF;
		}
		vis[i]=0;
		parent[i]=0;
	}
}
//交互式得到节点之间关系
void read() {
	int f,t,w;
	for(int i=1; i<=m; i++) {
		cin>>f>>t>>w;
		graph[f][t]=w;
	}
}
/*
*有向无环图中找环
* s:节点编号
* f:父节点编号
*/
int  findCircle(int s,int f) {
	//标记为已经访问
	vis[s]=true;
	parent[s]=f;
	//查找其子节点
	for(int i=1; i<=n; i++) {
		if( graph[s][i]!=INF && graph[s][i]!=0  ) {
			if( vis[i]==1 &&  i!=f ) { //找到回边
			    parent[i]=s;
				return i;
			}
			return findCircle( i,s );
		}
	}
	return -1;
}
/*
*
*找出环上的所有点
*/
void findCircle(int s) {
	int p=parent[s];
	while(p!=s) {
		cout<<p<<"\t";
		p=parent[p];
	}
}

int main(int argc, char** argv) {
	cin>>n>>m;
	init();
	read();
	int res= findCircle(1,0);
	findCircle(res);
	cout<<res;
	return 0;
}

//测试数据
6 6
1 2 1
6 1 1
2 4 1
4 6 1
3 5 1
5 6 1

测试结果:

7.png

另一个技巧就是为每一个节点设置一个开关变量,访问时(入栈)设置为true、访问结束(出栈)后设置为false。如果在还没有结束(出栈)时又重新访问到了此节点,可说明此节点有回边。

以下图为例。根据出栈入栈顺序做标记。

10.png

绿色虚线表示DFS时的递进线,递进时设置节点为访问状态(用 1 表示)。黄色虚线表示DFS时的回溯线,回溯时,设置节点访问结束状态(用 0 表示)。节点1的特殊在于会被两次标记为1。也就在第二次标记为1,表示它曾经被访问。

为什么要在回溯时设置节点为0,恢复原始状态。有可能出现如下图的情况。如果仅通过节点2是否被访问过确定此处有回边,是不正确的。

11.png

编码实现:

/*
*有向无环图中找环
* s:节点编号
*/
int findCircle(int s,int f) {
	if(vis[s]) {
		parent[s]=f;
		//如果进入栈时标记为 1,说明已经被访问过,有环存在
		return s;
	}
	//入栈时标记为已经访问
	vis[s]=true;
	parent[s]=f;
	//查找其子节点
	for(int i=1; i<=n; i++) {
		if( graph[s][i]!=INF && graph[s][i]!=0  ) {
			return findCircle( i,s );
		}
	}
	//出栈时恢复状态
	vis[s]=false;
	return -1;
}

/*
*
*找出环上的所有点
*/
void findCircle(int s) {
	int p=parent[s];
	while(p!=s) {
		cout<<p<<"\t";
		p=parent[p];
	}
}

int main(int argc, char** argv) {
	cin>>n>>m;
	init();
	read();
	int res= findCircle(1,0);
	findCircle(res);
	cout<<res;
	return 0;
}

是否有环检查后,便可进入拓扑排序过程。

2.3 拓扑排序

拓扑排序过程即为检查节点之间的依赖性的过程(通俗而言,就是谁依赖谁的问题)。

设计一个工作流时,往往会把整个工作流分解成几个子工作流,有些子工作流是可以同时进行的,有些子工作流需要等其它子工作流完毕后才能工作(一个子工作流的开始条件是另一个工作流的结束结果)。从多线程(进程)的角度而言,即存在并发时刻也存在互斥时刻。通过把子工作流建模成DAG结构,借助拓扑排序算法,能帮助建立稳定、健全、快速的工作流系统。

拓扑排序算法的两种实现。

广度搜索

遍历图结构,从入度为0的节点开始搜索,找到后删除与相邻节点之间的出度。重复这个过程,至到最后一个节点。如下图:

1.png

  • 找到入度为0的节点1。入度为0的节点从工作流而言,表示不存在对其它任何子工作流的依赖,自然是要先执行的。遍历出来,并删除与其邻接的2号和3号节点相连接的边,表示23的所依赖的1号目标已经完成。

12.png

  • 此时2号和3号节点入度变为0,均可以遍历出来。至于先遍历那一个,可以随机选择。也说明这两个节点表示的子工作流可以并行运行,同时删除与相邻节点的边。依次重复直到遍历出所有节点。

编码实现:

#include <bits/stdc++.h>
using namespace std;
//图
int graph[100][100];
//是否访问过
int vis[100];
//节点的父节点
int parent[100];
int INF=999;
//节点数、边数
int n,m;
//栈,存储拓扑排序结果
queue<int> myq;
//计数器
int count=0;

//初始化图,自己和自己的距离为0,和其它节点距离为 INF
void init() {
	for(int i=1; i<=n; i++) {
		for(int j=1; j<=n; j++) {
			graph[i][j]=0;
		}
		vis[i]=0;
		parent[i]=0;
	}
}
//交互式得到节点之间关系
void read() {
	int f,t,w;
	for(int i=1; i<=m; i++) {
		cin>>f>>t>>w;
		graph[f][t]=w;
	}
}

//查找入度为 0 的节点且删除与之相邻的出边
int findNode(int i) {
	bool is=true;
	for(int j=1; j<=n; j++) {
		if( graph[j][i]!=0) {
			is=false;
			break;
		}
	}
	if(is) {
		for(int j=1; j<=n; j++) {
			graph[i][j]=0;
		}
	}
	return is;
}

//找到入度为0 的节点压入队列
void pushQueue() {
	for(int i=1; i<=n; i++) {
		if( findNode(i) && !vis[i] ) {
			//找到,入队列
			vis[i]=true;
			myq.push(i);
		}
	}
}

/*
*拓扑排序
*/
void tp() {
	//初始化入度为 0 的节点入队列
	pushQueue();
	while( !myq.empty() ) {
		int t=  myq.front();
		count++;
		cout<<t<<"\t";
		myq.pop();
		pushQueue();
	}
    //如果出队列的节点数量和原节点数量不相同,说明有环
	if( count!=n )cout<<"有环";
}

int main(int argc, char** argv) {
	cin>>n>>m;
	init();
	read();
	tp();
	return 0;
}

深度搜索

DAG看成有向树,在后序遍历位置遍历节点,最后就能得到DAG的拓扑排序。如下图,表示对一棵二叉树后序遍历后的结果。

13.png

观察可知,把后序遍历的结果再逆输出,就能得到拓扑排序的结果1、3、7、9、8、6、2、5、4

#include <bits/stdc++.h>
using namespace std;
//图
int graph[100][100];
//是否访问过
int vis[100];
//节点的父节点
int parent[100];
int INF=999;
//节点数、边数
int n,m;
//是否有环
int isCircle=0;
//栈,存储拓扑排序结果
stack<int> stk;
//自己和自己的距离为0
void init() {
	for(int i=1; i<=n; i++) {
		for(int j=1; j<=n; j++) {
			if(i==j)graph[i][j]=0;
			else graph[i][j]=INF;
		}
		vis[i]=0;
		parent[i]=0;
	}
}
//交互式得到节点之间关系
void read() {
	int f,t,w;
	for(int i=1; i<=m; i++) {
		cin>>f>>t>>w;
		graph[f][t]=w;
	}
}
/*
*有向无环图中找环
* s:节点编号
*/
void findCircle(int s,int f) {
	if(vis[s]) {
		parent[s]=f;
		isCircle=1;
		//如果进入栈时标记为 1,说明已经被访问过
		return;
	}
	//入栈时标记为已经访问
	vis[s]=true;
	parent[s]=f;
	//查找其子节点
	for(int i=1; i<=n; i++) {
		if( graph[s][i]!=INF && graph[s][i]!=0  ) {
			findCircle( i,s );
		}
	}
	//出栈时恢复状态
	vis[s]=false;
	//存储后序遍历结果
	stk.push(s);
}

/*
*拓扑排序
*/
void tp(int s,int f) {
	findCircle(s,f);
	if(isCircle) {
		return;
	}
	cout<<"tp sort:"<<endl;
	while(!stk.empty()) {
		cout<<stk.top()<<"\t";
		stk.pop();
	}
}
int main(int argc, char** argv) {
	cin>>n>>m;
	init();
	read();
	tp(1,0);
	return 0;
}
//测试数据
6 6
1 2 1
1 3 1
2 4 1
3 5 1
4 6 1
5 6 1

3. 总结

如果你不懂得DAG的底层结构以及拓扑排序算法相关知识,并不妨碍你去使用SPARK。如果你没有用过SPARk,也不会影响你学习DAG。但是如果你懂得了DAG,又学会使用了SPARK,对高级应用和低级算法之间的关系会有更高层面的感悟。有一天,SPARk会死,但底层结构和算法思想却会永存。

【版权声明】本文内容来自摩杜云社区用户原创、第三方投稿、转载,内容版权归原作者所有。本网站的目的在于传递更多信息,不拥有版权,亦不承担相应法律责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@moduyun.com

  1. 分享:
最后一次编辑于 2023年12月12日 0

暂无评论

推荐阅读
8JXh2nINxLkX