一、概述
線性流水線與非線性流水線是CPU中指令處理流水線的一種分類標準。線性流水線很好理解,就是一條路走到黑的流水線;非線性流水線則不同,它可能存在前饋與反饋,每個部件可能使用一次或多次,它就沒法像線性流水線那么一個一個部件按部就班的走。因此出現了一個問題,如果我第一個任務第二次使用部件A,第二個任務恰好第一次也使用部件A,這會怎么樣?
出現矛盾了,流水線卡住了。這不好,因此需要流水線調度算法來安排好每一個任務,在讓它們不沖突的同時,最大可能提高流水線的效率。
二、分析
1、非線性流水線的描述
線性流水線能夠用流水線連接圖唯一表示,非線性流水線則不行,它的描述需要流水線連接圖和預約表的協同作用。一個流水線連接圖可能有好幾種執行順序,也就對應好幾個可能的預約表,一個預約表也可能對應好幾張流水線連接圖。只有同時看這兩者,才能確定最終的處理順序。下圖就是一張預約表。
這是一個有四個組件,七個流水段的流水線。橫軸代表時間,縱軸代表組件。×則代表在某一時刻當前使用的組件。因此我們可以得知,每個框中只能有一個×,否則會出現沖突。
2、調度算法的推導
首先,我們要提出三個定義:
啟動距離、禁止向量、沖突向量。
啟動距離指的是第一個任務進入流水線后,第二個任務進入時,不發生沖突的時間。顯然,啟動距離又長又短,我們的目的就是找出平均啟動距離最短的一個任務載入序列。
禁止向量指的是預約表中每一行任意兩個×之間距離的集合。例如上圖中,禁止向量為(2,4,6)。
沖突向量的指的是如下一個向量:(Cm,Cm-1......C2C1),其中C為0或1;m為禁止向量中的最大值。對于Ci,如果禁止向量在i中,則Ci=1,否則Ci=0。
上圖預約表對應的沖突向量指的是(101010)。
我們可以這樣理解沖突向量:對于一個任務x,其耗時為nk,n為流水段數量,k為每一流水段耗費的時間。假設各流水段好費時間相同。那么,在x進入后的第k,2k...nk時刻,均可以加載下一個任務y。然而可能出現沖突。可以通過計算沖突向量來規避這種沖突:
若沖突向量的Cm=1,則x進入后的第mk時刻不能加載y,否則會沖突。如圖,C1=0,C2=1。如果在x進入后2時刻,即橫軸為3的時候加載y,那么在橫軸為5的時候,x與y爭用S3部件,出現沖突。也就是說,y可以在C1、C3、C5也就是橫軸為2、4、6的時候載入。
現在我們假設y在C3載入,也就是在橫軸為4處載入。對于y來說,在它沒結束之前,有哪些時刻不能載入任務呢?很容易想的一點是:在C2、C4、C6不能載入。因為y是和x一樣的任務,因此x中不能載入的時刻,y也不能載入。但這夠了么?還不夠,x還會對y之后的任務產生影響,如何表達這種影響呢?用x的沖突向量來表達。
接著拿y在C3載入舉例,這時,從x的沖突向量可以知道,橫軸為3、5、7的時候不能載入,從y的沖突向量可以知道,橫軸為6、8、10的時候無法載入。做一下并集,在3、5、6、7、8、10的時候不能載入。由于y在4的時候載入,因此僅需要注意5、6、7、8、10即可。由5、6、7、8、10可以得出y的沖突向量101111。這是我們手動算出來的。如何通過數學方法計算呢?
通過右移與按位取并操作進行。
如上圖,右移3代表選擇x的非沖突時刻C3,右移3得到的000101代表對于y來說,x的影響,101010是y自身的影響。二者疊加可得所有不能載入的時刻。
再如上圖,初始任務x的沖突向量為1010100,第二個任務y若選擇在2時間后載入,那么x對其的影響可以寫為0010101;兩個影響疊加為1010101,與最初的影響相同;第二個任務y若選擇在1時間后載入,那么x對其的影響為0101010,兩個影響合并為1111110,這是y的沖突向量;第三個任務z若選擇在y后1時間載入,那么y對其的影響為0111111,兩個影響合并為1111111。
也可以將1111111看成是xyz三個任務共同影響的結果,因為1111111是x的沖突向量右移兩位并上y右移一位并上z得到的。
如此,我們需要得到所有沖突向量右移所有的0的位置得到的所有的結果,然后按位取并,繼續右移繼續取并,直到結果與之前的相同。這樣我們就會得到一張圖。
上圖是101010所得到的的圖。這其中的所有循環,就是我們要找的非沖突的載入序列。也就是我們算法最后需要的結果。
三、代碼實現
由1、2可以得知,算法的整體思路可以按如下幾步:
第一,輸入預約表;
第二,得到初始沖突向量;
第三,生成狀態圖;
第四,找到所有循環。
其中,第三和第四步可以轉化為如下問題:生成一張有向圖,記錄其中所有循環;但是我的算法可以按另一種形式描述:生成一棵樹,當根節點對應的葉子節點與之前的節點相同是,記錄之前的節點到根節點的路徑。
以下是我的代碼。
1、輸入函數
為便于輸入,選擇從外部文件讀取預約表,函數如下:
int readTable(int a,int b)
{
FILE*fp = NULL;//需要注意
fp = fopen(F_PATH, "r");
if (NULL == fp) return -1;//要返回錯誤代碼
for (int i = 0; i
< b; j++)
{
int tmp;
fscanf(fp,"%d", &tmp);
ResTable[i][j] = tmp;
}
fclose(fp);
fp = NULL;//需要指向空,否則會指向原打開文件地址
return 0;
}
使用fopen讀入文件流,用fp儲存文件流,然后生成預約表對應的二維數組即可。
2、得到初始沖突向量
為得到沖突向量,首先要得到禁止向量。由于禁止向量中沒有重復元素,選擇使用set保存。然后遍歷各行,計算出預約表中不為零的元素對應的列坐標的差的絕對值,保存在dis中。
然后遍歷dis,使用string生成沖突向量,dis中的值作為string中元素下標,這些下標對應的值為1,其余為0。從而生成初始沖突向量。
void Create_Initial_Conflict_Vector(int a,int b)
{
set
dis;//不為零之間的距離
for (int i = 0; i
tmp;
for (int j = 0; j < b; j++)
{
if (ResTable[i][j] != 0)
{
tmp.push_back(j);
}
for (int m = 0; m < tmp.size(); m++)
{
for (int n = m + 1; n < tmp.size(); n++)
{
dis.insert(abs(tmp[m] - tmp[n]));
}
}
}
}
//printSet(dis);
for (int i = 1; i < b; i++)
{
if (dis.find(i) != dis.end())
Initial_Conflict_Vector = '1' + Initial_Conflict_Vector;
else
Initial_Conflict_Vector = '0' + Initial_Conflict_Vector;
}
}
3、生成狀態圖并找出所有循環
重頭戲,算法的精髓就在這里。我選擇使用遞歸生成狀態圖并保存循環。
首先需要寫兩個工具函數,string右移以及string按位取并。如下:
string StringRightMove(string s, int k)
{
while (k != 0)
{
s = '0' + s;
s.pop_back();
k--;
}
return s;
}
string StringAnd(string s1, string s2)
{
for (int i = 0; i < s1.size(); i++)
{
if (s1[i] == '0'&&s2[i] == '0')
s1[i] = '0';
else
s1[i] = '1';
}
return s1;
}
然后開始寫遞歸函數。代碼如下:
void Find_real_Circle(string ConVector, unordered_set
CVTable, vector
Now_Start_Cycle, int len)
{
if (CVTable.find(ConVector) != CVTable.end())
{
StartCycle.push_back(Now_Start_Cycle);
unordered_set
::iterator it = CVTable.begin();
int numhead = 0;
while (*it != ConVector)
{
it++;
numhead++;
}
vector
RealCycle(Now_Start_Cycle.begin() + numhead, Now_Start_Cycle.end());
real_StartCycle.push_back(RealCycle);
//CVTable.erase(ConVector);
return;
}
else
{
CVTable.insert(ConVector);
int num_1 = 0;
for (int i = len - 1; i >= 0; i--)
{
if (ConVector[i] == '0')
{
string tmpVector, tmpResult;
tmpVector = StringRightMove(ConVector, len - i);
tmpResult = StringAnd(tmpVector, Initial_Conflict_Vector);
Now_Start_Cycle.push_back(len - i);
Find_real_Circle(tmpResult, CVTable, Now_Start_Cycle, len);
Now_Start_Cycle.pop_back();
}
else
num_1++;
}
Now_Start_Cycle.push_back(len + 1);
//VectorPrint(Now_Start_Cycle);
real_StartCycle.push_back(Now_Start_Cycle);
StartCycle.push_back(Now_Start_Cycle);
return;
}
}
參數如下:
string ConVector:上一任務的沖突向量
unordered_set
?vector
int len:沖突向量長度
首先,要設計遞歸出口:什么時候退出遞歸?當然是右移取并得到的結果發現之前已經得到過,這時候退出遞歸,即找到了一個循環。如何發現本次得到的結果之前已經得到過?用set可以,但是考慮到存在如下的循環:
42222222,循環體為2,但是要先經過4才能開始循環。因此只用set只能“發現循環”,而不能“發現循環體”。這不好,unordered_set更好:一旦我在unordered_set中發現結果,由于unordered_set相比于set保留了壓入時的順序,通過定位到結果的位置,那么結果到unordered_set結尾便是循環體。
使用全局變量vector
然后設計遞歸體。對于遞歸體,我們要對ConVector的所有為0的位都照顧到,即所有為0的位都要右移然后取并得到新的沖突向量。另外,進入遞歸體說明還沒找到循環,那么當前的沖突向量要保存在CVTable中,當前右移的值要保存在Now_Start_Cycle中。在保存并形成新的沖突向量后,繼續調用函數開始遞歸。
在所有的0都右移之后,要將len+1保存在Now_Start_Cycle中,因為所有載入的任務在len+1之后都必定可以開始下一個任務而不產生沖突。
這樣就得到了所有循環。
4、輸出結果
這就是按部就班的寫就可以了。沒什么好說的。
for (int i = 0; i < StartCycle.size(); i++)
{
cout << "初始循環為:";
for (int j = 0; j < StartCycle[i].size(); j++)
{
cout << StartCycle[i][j] << ' ';
}
cout << "循環體為:";
for (int j = 0; j < real_StartCycle[i].size(); j++)
{
cout << real_StartCycle[i][j] << ' ';
}
cout << '\n';
}
int try_num = 0;
while (try_num<1)
{
cout << "請選擇循環序號:\n";
int Cycle_x;
cin >> Cycle_x;
int Display_table[10][200] = { 0 };
int CycleNum[10] = { 0 };
int len1, len2;
len1 = StartCycle[Cycle_x].size();
len2 = real_StartCycle[Cycle_x].size();
CycleNum[0] = StartCycle[Cycle_x][0];
for (int i = 1; i < len1; i++)
CycleNum[i] = CycleNum[i - 1] + StartCycle[Cycle_x][i];
for (int i = len1; i
< b; j++)
{
int p = 1;
if (ResTable[i][j] != 0)
{
Display_table[i][j] = p;
for (int k = 0; k < len1 + len2 * 2; k++)
{
Display_table[i][j + CycleNum[k]] = p + k + 1;
}
}
}
printTable(a, CycleNum[len1 + len2 * 2 - 1], Display_table);
try_num++;
}
5、效果
6、流水線生成圖片
由于使用C++生成圖片太過困難,而利用python的matplotlib包生成圖片又很簡單。因此我不得不寫了另外一個python版本。
python版本使用了numpy和matplotlib兩個常用庫。其算法思想與C++如出一轍,唯一要注意的就是在迭代的時候的深拷貝與淺拷貝的問題,迭代中許多變量要使用深拷貝,否則會出錯,這里很難通過debug得出。
效果如下:
效果比只有0和1還是好不少的。
四、總結
在實現方面,調度算法的核心就在于查找循環,查找循環的核心就是遞歸算法。遞歸算法寫好,整個問題便迎刃而解。
在理論方面,理解沖突向量是如何生成的是關鍵,而這需要理解沖突向量的含義,理解其含義,配合預約表中多個任務走一遍流程,就可以知道算法的原理了。
PS:代碼如下:
C++版:
#include
#include
#include
#include
#include
python版:
import matplotlib.pyplot as plt
import pylab
import pandas as pd
import copy
StartCycle=list()
RealStartCycle=list()
InitialConflictVector=str()
def Create_Initial_Conflict_Vector(a:int ,b:int ,Table:list)->str:
dis=set()
for i in range(a):
tmp=[]
for j in range(b):
if (Table[i][j]==1):
tmp.append(j)
for m in range(0,len(tmp)):
for n in range(m+1,len(tmp)):
dis.add(abs(tmp[m]-tmp[n]))
result=str()
for i in range(1,b):
if i in dis:
result='1'+result
else:
result='0'+result
return result
def String_Right_Move(s:str, k:int)->str:
while (k != 0):
s = '0' + s
s = s[:-1]
k = k-1
return s
def String_And(s1:str,s2:str):
s=str()
for i in range(len(s1)):
if s1[i] == '0'and s2[i] == '0':
s = s + '0';
else:
s = s + '1';
return s;
def Find_Real_Circle(ConVector:str, CVTable:set, CVVector:list, NowStartCycle:list, lenth:int)->None:
if ConVector in CVTable:
StartCycle.append(copy.deepcopy(NowStartCycle))
numhead=0
while(ConVector!=CVVector[numhead]):
numhead=numhead+1
RealStartCycle.append(copy.deepcopy(NowStartCycle[numhead:len(NowStartCycle)]))
CVTable.remove(ConVector)
CVVector.pop()
return
else:
CVTable.add(copy.deepcopy(ConVector))
CVVector.append(copy.deepcopy(ConVector))
for i in range((lenth-1),-1,-1):
if (ConVector[i]=='0'):
TmpVector = String_Right_Move(ConVector,lenth-i)
TmpResult = String_And(TmpVector,ConVector)
NowStartCycle.append(lenth-i)
Find_Real_Circle(TmpResult, copy.deepcopy(CVTable), copy.deepcopy(CVVector), NowStartCycle, lenth)
NowStartCycle.pop()
NowStartCycle.append(lenth+1)
RealStartCycle.append(copy.deepcopy(NowStartCycle))
StartCycle.append(copy.deepcopy(NowStartCycle))
NowStartCycle.pop()
return
def Create_Cycle_Using_List(Cycle1:list,Cycle2:list,kth:int)->list:
CycleUsingList=list()
CycleUsingList.append(Cycle1[kth][0])
for i in range(1,len(Cycle1[kth])):
CycleUsingList.append(CycleUsingList[i-1]+Cycle1[kth][i])
for i in range(len(Cycle2[kth])):
CycleUsingList.append(CycleUsingList[len(Cycle1[kth])+i-1]+Cycle2[kth][i])
for i in range(len(Cycle2[kth])):
CycleUsingList.append(CycleUsingList[len(Cycle1[kth])+len(Cycle2[kth])+i-1]+Cycle2[kth][i])
for i in range(len(Cycle2[kth])):
CycleUsingList.append(CycleUsingList[len(Cycle1[kth])+len(Cycle2[kth])*2+i-1]+Cycle2[kth][i])
return CycleUsingList
def Create_Matrix(InitialMatrix:list,CycleList:list,a:int)->list:
Resultlist=[[0 for col in range(CycleList[len(CycleList)-1]+len(InitialMatrix[0]))] for row in range(a)]
TmpList=[0]*len(CycleList)
for i in range(a):
for j in range(len(InitialMatrix[0])):
if InitialMatrix[i][j]==1:
Resultlist[i][j]=1
for k in range(1,len(CycleList)):
Resultlist[i][j+CycleList[k]]=1+k
return Resultlist
#預約表
ResTable_DataFrame=pd.read_csv(r'D:\LeetCode\Nonlinear Pipelining Python\file.csv',header=None)
ResTable_list=ResTable_DataFrame.values.tolist()
ResTable=tuple(ResTable_list)
a=int(input("請輸入流水線的功能部件數量:"))
b=int(input("請輸入流水段數量:"))
InitialConflictVector=Create_Initial_Conflict_Vector(a,b,ResTable)
s1="初始沖突向量為: %s"%(InitialConflictVector)
print(s1)
RealCVTable=set()
RealCVVector=list()
RealNowStartCycle=list()
Find_Real_Circle(InitialConflictVector,RealCVTable,RealCVVector,RealNowStartCycle,b-1)
for i in range(len(StartCycle)):
s2 = "進入循環的序列為: %s 循環體為: %s" %(StartCycle[i],RealStartCycle[i])
print(s2)
for i in range(3):
n=int(input("請選擇查看第幾個循環:"))
CycleUsingList=Create_Cycle_Using_List(StartCycle,RealStartCycle,n)
CycleUsingMatrix=Create_Matrix(ResTable_list,CycleUsingList,a)
plt.imshow(CycleUsingMatrix, interpolation='nearest')
pylab.show()
?
更多文章、技術交流、商務合作、聯系博主
微信掃碼或搜索:z360901061

微信掃一掃加我為好友
QQ號聯系: 360901061
您的支持是博主寫作最大的動力,如果您喜歡我的文章,感覺我的文章對您有幫助,請用微信掃描下面二維碼支持博主2元、5元、10元、20元等您想捐的金額吧,狠狠點擊下面給點支持吧,站長非常感激您!手機微信長按不能支付解決辦法:請將微信支付二維碼保存到相冊,切換到微信,然后點擊微信右上角掃一掃功能,選擇支付二維碼完成支付。
【本文對您有幫助就好】元
