精华内容
下载资源
问答
  • GPS信号FFT捕获的GPU实现.pdf
  • 上篇讲述了一维FFTGPU实现FFT算法实现——基于GPU的基2快速傅里叶变换),后来我又由于需要做了一下二维FFT,大概思路如下。 首先看的肯定是公式: 如上面公式所描述的,2维FFT只需要拆分成行FFT,和列FFT就...

    2维FFT算法实现——基于GPU的基2快速二维傅里叶变换

    上篇讲述了一维FFT的GPU实现(FFT算法实现——基于GPU的基2快速傅里叶变换),后来我又由于需要做了一下二维FFT,大概思路如下。

    首先看的肯定是公式:

    如上面公式所描述的,2维FFT只需要拆分成行FFT,和列FFT就行了,其中我在下面的实现是假设原点在F(0,0),由于我的代码需要原点在中心,所以在最后我将原点移动到了中心。

    下面是原点F(0,0)的2维FFT的伪代码:

    复制代码
        //C2DFFT
        //被执行2DFFT的是一个N*N的矩阵,在source_2d中按行顺序储存
        //水平方向FFT
        for (int i=0;i<N;i++)
        {
            fft1(&source_2d[i*N],&source_2d_1[i*N],N);
        }
        //转置列成行
        for (int i=0;i<N*N;i++)
        {
            int x = i%N;
            int y = i/N;
            int index = x*N+y;
            source_2d[index] = source_2d_1[i];
        }
        //垂直FFT
        for(int i=0;i<N;i++)
        {
            fft1(&source_2d[i*N],&source_2d_1[i*N],N);
        }
        //转置回来
        for (int i=0;i<N*N;i++)
        {
            int x = i%N;
            int y = i/N;
            int index = x*N+y;
            source_2d[index] = source_2d_1[i];
        }
    复制代码

    GPU实现无非把这些东西转换到GPU上。

    我基于OpenGL的fragment shader来计算fft;数据都存放在纹理或者FBO里面。和1维fft不同的是,NXN的数据里面,只是对当前列或者当前排做一维FFT,所以bit反转表只需要一个1*N的buffer就可以了。对应的蝴蝶图数据也只需要1*N即可。所以我们有如下的分配:

    static ofFbo _fbo_bitrev_table;
    static ofFbo _origin_butterfly_2d;
    
    _fbo_bitrev_table.allocate(N,1,GL_RGBA32F);
    _origin_butterfly_2d.allocate(N,1,GL_RGBA32F);

    首先要做的是把长度为N的bit反转表求出来,这个只需要求一次,所以在最开始的时候就用CPU求出来:

    复制代码
        for(int i=0;i<N;i++)
        {
              _bitrev_index_2d.setColor(i,0,ofFloatColor(bit_rev(i,N-1),0,0,0));
        }
    
        _bitrev_index_2d.update();
    
        //翻转后的索引
        _fbo_bitrev_table.begin();
        _bitrev_index_2d.draw(0,0,N,1);
        _fbo_bitrev_table.end();
    复制代码

    然后初始化最初的蝴蝶图,这个和1维FFT是一样的,只是长度不同而已:

    复制代码
    for(int i=0;i<N;i++)
        {
            //初始化二维蝴蝶图
            if(i%2==0)
            {
                _data_2d.setColor(i,0,ofFloatColor(0.f,2.f,0,i+1));
            }
            else
            {
                _data_2d.setColor(i,0,ofFloatColor(1.f,2.f,0,i-1));
            }
            
        }
    
        _data_2d.update();
    
        /////2D初始化/////
        //初始化2D蝴蝶图
        _weight_index_2d[0].begin();
        _data_2d.draw(0,0,N,1);
        _weight_index_2d[0].end();
        //备份2D初始蝴蝶图,用于下一次新的计算
        _origin_butterfly_2d.begin();
        _data_2d.draw(0,0,N,1);
        _origin_butterfly_2d.end();
    复制代码

    辅助函数:

    复制代码
        static unsigned int bit_rev(unsigned int v, unsigned int maxv)
        {
            unsigned int t = log(maxv + 1)/log(2);
            unsigned int ret = 0;
            unsigned int s = 0x80000000>>(31);
            for (unsigned int i = 0; i < t; ++i)
            {
                unsigned int r = v&(s << i);
                ret |= (r << (t-i-1)) >> (i);    
            }
            return ret;
        }
    
        static void bit_reverse_copy(RBVector2 src[], RBVector2 des[], int len)
        {
            for (int i = 0; i < len;i++)
            {
                des[bit_rev(i, len-1)] = src[i];
            }
        }
    复制代码

    下面定义计算2维IFFT的函数:

    void GPUFFT::ifft_2d(ofFbo& in,ofFbo& out,int size);

    其中in是输入,out是输出,size就是N,由初始化的时候传入了一次,在这里写是为了方便调试的时候临时改变尺寸。

    Ifft本身的代码和上面形式一样,内容变成了各种shader计算:

    复制代码
    void GPUFFT::ifft_2d(ofFbo& in,ofFbo& out,int size)
    {
        //禁用Alpha混合,否则绘制到FBO会混合Alpha,造成数据丢失
        ofDisableAlphaBlending();
    
        //水平FFT
        _weight_index_2d[_cur_2d].begin();
        _origin_butterfly_2d.draw(0,0,N,1);
        _weight_index_2d[_cur_2d].end();
    
        
        _fbo_in_bitreved_2d.begin();
        _bit_reverse_shader_2d.begin();
        _bit_reverse_shader_2d.setUniform3f("iResolution",N,N,0);
        _bit_reverse_shader_2d.setUniform1i("N",N);
        _bit_reverse_shader_2d.setUniform1i("dir",1);
        _bit_reverse_shader_2d.setUniformTexture("tex_origin",in.getTextureReference(),1);
        _bit_reverse_shader_2d.setUniformTexture("tex_bitreverse_table",_fbo_bitrev_table.getTextureReference(),2);
        ofRect(0,0,N,N);
        _bit_reverse_shader_2d.end();
        _fbo_in_bitreved_2d.end();
    
        //翻转后的数据
        _res_back_2d[_cur_2d].begin();
        _fbo_in_bitreved_2d.draw(0,0,N,N);
        _res_back_2d[_cur_2d].end();
    
    
        for(int i = 1;i<N;i*=2)
        {
            _res_back_2d[1-_cur_2d].begin();
            ofClear(0,0,0,0);
            _gpu_fft_shader_2d.begin();
            _gpu_fft_shader_2d.setUniform1i("size",N);
            _gpu_fft_shader_2d.setUniform1i("n_step",i);
            _gpu_fft_shader_2d.setUniform3f("iResolution",N,N,0);
            _gpu_fft_shader_2d.setUniform1i("dir",1);
            _gpu_fft_shader_2d.setUniformTexture("tex_index_weight",_weight_index_2d[_cur_2d].getTextureReference(),1);
            _gpu_fft_shader_2d.setUniformTexture("tex_res_back",_res_back_2d[_cur_2d].getTextureReference(),2);
            //_gpu_fft_shader_2d.setUniformTexture("test",imag_test.getTextureReference(),4);
    
            ofRect(0,0,N,N);
    
            _gpu_fft_shader_2d.end();
    
            _res_back_2d[1-_cur_2d].end();
    
    
            _weight_index_2d[1-_cur_2d].begin();
            ofClear(0,0,0,0);
    
            _weight_index_shader_2d.begin();
            _weight_index_shader_2d.setUniform1i("size",N);
            _weight_index_shader_2d.setUniform1i("n_step",i);
            _weight_index_shader_2d.setUniform3f("iResolution",N,1,0);
            _weight_index_shader_2d.setUniform1i("dir",1);
            _weight_index_shader_2d.setUniformTexture("tex_input",_weight_index_2d[_cur_2d].getTextureReference(),1);
    
            ofRect(0,0,N,1);
    
            _weight_index_shader_2d.end();
    
            _weight_index_2d[1-_cur_2d].end();
    
    
    
            _cur_2d = 1 - _cur_2d;
        }
    
        //for ifft
        _res_back_2d[1-_cur_2d].begin();
        _res_back_2d[_cur_2d].draw(0,0,N,N);
        _res_back_2d[1-_cur_2d].end();
    
        _res_back_2d[_cur_2d].begin();
        _ifft_div_shader_2d.begin();
        _ifft_div_shader_2d.setUniform1i("N",N);
        _ifft_div_shader_2d.setUniform3f("iResolution",N,N,0);
        _ifft_div_shader_2d.setUniformTexture("tex_rgb",_res_back_2d[1-_cur_2d].getTextureReference(),1);
        ofRect(0,0,N,N);
        _ifft_div_shader_2d.end();
        _res_back_2d[_cur_2d].end();
    
        //垂直FFT
        //垂直方向的所有都是计算都按照垂直方向来
        _weight_index_2d[_cur_2d].begin();
        _origin_butterfly_2d.draw(0,0,N,1);
        _weight_index_2d[_cur_2d].end();
    
        //这一步不会将垂直水平化
        _fbo_in_bitreved_2d.begin();
        _bit_reverse_shader_2d.begin();
        _bit_reverse_shader_2d.setUniform3f("iResolution",N,N,0);
        _bit_reverse_shader_2d.setUniform1i("N",N);
        _bit_reverse_shader_2d.setUniform1i("dir",2);
        _bit_reverse_shader_2d.setUniformTexture("tex_origin",_res_back_2d[_cur_2d].getTextureReference(),1);
        _bit_reverse_shader_2d.setUniformTexture("tex_bitreverse_table",_fbo_bitrev_table.getTextureReference(),2);
        ofRect(0,0,N,N);
        _bit_reverse_shader_2d.end();
        _fbo_in_bitreved_2d.end();
    
    
    
        //翻转后的数据
        _res_back_2d[_cur_2d].begin();
        _fbo_in_bitreved_2d.draw(0,0,N,N);
        _res_back_2d[_cur_2d].end();
    
    
        for(int i = 1;i<N;i*=2)
        {
            _res_back_2d[1-_cur_2d].begin();
            ofClear(0,0,0,0);
            _gpu_fft_shader_2d.begin();
            _gpu_fft_shader_2d.setUniform1i("size",N);
            _gpu_fft_shader_2d.setUniform1i("n_step",i);
            _gpu_fft_shader_2d.setUniform3f("iResolution",N,N,0);
            _gpu_fft_shader_2d.setUniform1i("dir",2);
            _gpu_fft_shader_2d.setUniformTexture("tex_index_weight",_weight_index_2d[_cur_2d].getTextureReference(),1);
            _gpu_fft_shader_2d.setUniformTexture("tex_res_back",_res_back_2d[_cur_2d].getTextureReference(),2);
            //_gpu_fft_shader_2d.setUniformTexture("test",imag_test.getTextureReference(),4);
    
            ofRect(0,0,N,N);
    
            _gpu_fft_shader_2d.end();
    
            _res_back_2d[1-_cur_2d].end();
    
    
    
            _weight_index_2d[1-_cur_2d].begin();
            ofClear(0,0,0,0);
    
            _weight_index_shader_2d.begin();
            _weight_index_shader_2d.setUniform1i("size",N);
            _weight_index_shader_2d.setUniform1i("n_step",i);
            _weight_index_shader_2d.setUniform3f("iResolution",N,1,0);
            _weight_index_shader_2d.setUniform1i("dir",2);
            _weight_index_shader_2d.setUniformTexture("tex_input",_weight_index_2d[_cur_2d].getTextureReference(),1);
    
            ofRect(0,0,N,1);
    
            _weight_index_shader_2d.end();
    
            _weight_index_2d[1-_cur_2d].end();
    
            _cur_2d = 1 - _cur_2d;
        }
    
        //for ifft
        _res_back_2d[1-_cur_2d].begin();
        _res_back_2d[_cur_2d].draw(0,0,N,N);
        _res_back_2d[1-_cur_2d].end();
    
        _res_back_2d[_cur_2d].begin();
        _ifft_div_shader_2d.begin();
        _ifft_div_shader_2d.setUniform1i("N",N);
        _ifft_div_shader_2d.setUniform3f("iResolution",N,N,0);
        _ifft_div_shader_2d.setUniformTexture("tex_rgb",_res_back_2d[1-_cur_2d].getTextureReference(),1);
        ofRect(0,0,N,N);
        _ifft_div_shader_2d.end();
        _res_back_2d[_cur_2d].end();
    
        out.begin();
        _res_back_2d[_cur_2d].draw(0,0,N,N);
        out.end();
    
        //恢复Alpha混合
        //ofEnableAlphaBlending();
    }
    复制代码

    现在来看shader内容:

     _bit_reverse_shader_2d

    这个shader用于将整个N*N的数据全部按照行或者按照列进行翻装,使之满足执行fft的条件:

    复制代码
    #version 130
    uniform sampler2D tex_origin;
    //1xN查找表,用于查找索引对应的bitreverse数
    uniform sampler2D tex_bitreverse_table;
    //1 for x direction,2 for y direction
    uniform int dir;
    uniform int N;
    uniform vec3 iResolution;
    
    out vec4 outColor;
    
    void main()                                      
    {   
        vec2 tex_coord = gl_FragCoord.xy/iResolution.xy;
        
        vec2 table_index;
        table_index.y = 0.5;
        if(dir==1)
            table_index.x = tex_coord.x;
        else
            table_index.x = tex_coord.y;
        float bitreverse = texture(tex_bitreverse_table,table_index).r;
        
        vec2 origin_index;
        if(dir==1)
        {
            //x方向
            origin_index.y = tex_coord.y;
            origin_index.x = (bitreverse+0.5)/N;
        }
        else
        {
            //y方向
            origin_index.x = tex_coord.x;
            origin_index.y = (bitreverse+0.5)/N;
        }
        vec2 param = texture(tex_origin,origin_index).xy;
        
        outColor = vec4(param,0,1);
    }
    复制代码

    _gpu_fft_shader_2d

    这是fft执行计算的部分,同样分为按行和按列:

    复制代码
    #version 130
    //NX1
    uniform sampler2D tex_index_weight;
    //NXN
    uniform sampler2D tex_res_back;
    uniform sampler2D test;
    uniform int size;
    uniform int n_step;
    //1 for x direction,2 for y direction
    uniform int dir;
    
    uniform vec3 iResolution;
    
    out vec4 outColor;
    
    void main()                                      
    {   
        vec2 tex_coord = gl_FragCoord.xy/iResolution.xy;
        
        vec2 first_index;
        if(dir==1)
        {
            first_index.y = 0.5;
            first_index.x = tex_coord.x;
        }
        else
        {
            first_index.y = 0.5;
            first_index.x = tex_coord.y;
        }
        
        float cur_x = gl_FragCoord.x - 0.5;
        float cur_y = gl_FragCoord.y - 0.5;
        
        vec2 outv;
        
        vec4 temp = texture(tex_index_weight,first_index);
        //ifft
        vec2 weight = vec2(cos(temp.r/temp.g*2*3.141592653),-sin(temp.r/temp.g*2*3.141592653));
        //fft
        //vec2 weight = vec2(cos(temp.r/temp.g*2*3.141592653),sin(temp.r/temp.g*2*3.141592653));
        vec2 _param2_index;
    
        if(dir==1)
        {
            _param2_index.x = (temp.a + 0.5)/size;
            _param2_index.y = tex_coord.y;
        }
        else
        {
            _param2_index.x = tex_coord.x;
            _param2_index.y = (temp.a + 0.5)/size;
        }
        
        
        vec2 param1 = texture(tex_res_back,tex_coord).rg;
        vec2 param2 = texture(tex_res_back,_param2_index).rg;
        
        float tex_coord_n1;
        float tex_coord_n2;
        if(dir==1)
        {
            tex_coord_n1 = cur_x;
        }
        else
        {
            tex_coord_n1 = cur_y;
        }
        
        tex_coord_n2 = temp.a;
        
        if(tex_coord_n1<tex_coord_n2)
        {
            outv.r = param1.r + param2.r*weight.r-weight.g*param2.g;
            outv.g = param1.g +weight.r*param2.g + weight.g*param2.r;
        }
        else
        {
            outv.r = param2.r + param1.r*weight.r-weight.g*param1.g;
            outv.g = param2.g +weight.r*param1.g + weight.g*param1.r;
        }
        
        
        outColor = vec4(outv,0,1);
        
    }
    复制代码

    _weight_index_shader_2d

    更新蝴蝶图索引:

    复制代码
    #version 130
    
    uniform sampler2D tex_input;
    uniform int size;
    uniform int n_total;
    //start with 2
    uniform int n_step;
    //1 for x direction,2 for y direction
    uniform int dir;
    
    uniform vec3 iResolution;
    out vec4 outColor;
    
    void main()                                      
    {      
        vec2 tex_coord = gl_FragCoord.xy/iResolution.xy;
        vec4 fetch = texture(tex_input,tex_coord);
        float cur_x = gl_FragCoord.x - 0.5;
        float cur_y = gl_FragCoord.y - 0.5;
        
        vec4 outv;
        float tex_coord_n;
        if(dir==1)
        {
            //x dir
            tex_coord_n = cur_x;
        }
        else
        {
            //y dir
            tex_coord_n = cur_x;
        }
        
        //updata weight
        vec2 pre_w = fetch.rg;
        float i = pre_w.r;
        float n = pre_w.g;
        float new_i;
        float new_n;
        new_i = i;
        new_n = n*2;
        if(int(tex_coord_n)%(n_step*4) > n_step*2-1)
        {
            new_i += n_step*2;
        }
        outv.r = new_i;
        outv.g = new_n;
        //outv.rg = tex_coord;
        
        //updata index
        vec2 pre_index = fetch.ba;
        int x = int(pre_index.x);
        int y = int(pre_index.y);
        int ni = n_step*2;
        float new_tex_coord_n = tex_coord_n;
        if((int(tex_coord_n)/ni)%2==0)
        {
            new_tex_coord_n += ni;
        }
        else
        {
            new_tex_coord_n -= ni;
        }
        
        outv.b = 0;
        outv.a = new_tex_coord_n;
        outColor = outv;
        //outColor = vec4(tex_coord_n,tex_coord_n%n_step,tex_coord_n%n_step,tex_coord_n%n_step);
    } 
    复制代码

    最后的

    _ifft_div_shader_2d

    是为了计算ifft,将每个计算结果除以一个N:

    复制代码
    #version 130
    uniform sampler2D tex_rgb;
    uniform int N;
    uniform vec3 iResolution;
    
    out vec4 outColor;
    
    void main()                                      
    {   
        vec2 tex_coord = gl_FragCoord.xy/iResolution.xy;
        
        vec2 outv;
            
        vec4 c = texture(tex_rgb,tex_coord);
    
        outv.r = c.r/N;
        outv.g = c.g/N;
        outColor = vec4(outv,0,1);
    }
    复制代码

    最后,out里面就是结果了。

    对于将原点移动到中心多了以下shader:

    复制代码
            vec4 c;
            if(tex_coord.x>0.5&&tex_coord.y>0.5)
            {
                c = texture(tex_rgb,tex_coord-vec2(0.5,0.5));
                
            }
            if(tex_coord.x>0.5&&tex_coord.y<0.5)
            {
                c = texture(tex_rgb,tex_coord+vec2(-0.5,0.5));
            }
            if(tex_coord.x<0.5&&tex_coord.y>0.5)
            {
                c = texture(tex_rgb,tex_coord+vec2(0.5,-0.5));
            }
            if(tex_coord.x<0.5&&tex_coord.y<0.5)
            {
                c = texture(tex_rgb,tex_coord+vec2(0.5,0.5));
            }
            outColor = c;
    复制代码

     

    展开全文
  • 将我在学习利用cufft库实现多次一维FFT实现过程中的总结,步骤清晰,注释详细,可供参考。
  • 首先看一下什么叫做快速傅里叶变换(FFT)(来自Wiki): 快速傅里叶变换(英语:Fast Fourier Transform, FFT),是离散傅里叶变换的快速算法,也可用于计算离散傅里叶变换的逆变换。快速傅里叶变换有广泛的应用,...

    最近做一个东西,要用到快速傅里叶变换,抱着蛋疼的心态,自己尝试写了一下,遇到一些问题。

    首先看一下什么叫做快速傅里叶变换(FFT)(来自Wiki):
    快速傅里叶变换(英语:Fast Fourier Transform, FFT),是离散傅里叶变换的快速算法,也可用于计算离散傅里叶变换的逆变换。快速傅里叶变换有广泛的应用,如数字信号处理、计算大整数乘法、求解偏微分方程等等。

    离散傅里叶变换公式:

    y j = ∑ k = 0 n − 1 e − 2 π i n j k x k , j = 0 , 1 , . . . , n − 1 y_j=\sum_{k=0}^{n-1}e^{-\frac{2\pi i}{n}j^k}x_k,j=0,1,...,n-1 yj=k=0n1en2πijkxk,j=0,1,...,n1

    直接变换的计算复杂度是 O ( n 2 ) O(n^2) O(n2).快速傅里叶变换可以计算出与直接计算相同的结果,但只需要 O ( n log ⁡ n ) O(n\log n) O(nlogn)的计算复杂度。

    快速傅里叶变换Wiki

    首先,看一个概念叫做N次单位复根:复数 W N n = 1 WN^n = 1 WNn=1。N次单位复根一共有N个,分别为: W ( k , N ) = c i s ( 2 ∗ P I ∗ k / N ) , k = 0 , 1 , . . . , n − 1 W(k,N) = cis(2*PI*k/N),k=0,1,...,n-1 W(k,N)=cis(2PIk/N),k=0,1,...,n1

    其中 c i s ( u ) = e i ∗ u = cos ⁡ ( u ) + i sin ⁡ ( u ) cis(u) = e^i*u = \cos(u) + i\sin(u) cis(u)=eiu=cos(u)+isin(u)(欧拉公式)

    N次单位复根有如下性质:

    1. W ( k + N , N ) = W ( k , N ) W(k+N,N) = W(k,N) W(k+N,N)=W(k,N)
    2. W ( k + N / 2 , N ) = − W ( k , N ) W(k+N/2,N) = -W(k,N) W(k+N/2,N)=W(k,N)
    3. W ( d k , d N ) = W ( k , N ) ( d > 0 ) W(dk,dN) = W(k,N) (d>0) W(dk,dN)=W(k,N)(d>0)

    库利-图基算法:

    将长度为N的序列分成两个N/2的子序列N1和N2,其中N1是原来序列的奇数项,N2为偶数项。

    将离散傅里叶变换写成:

    N ( x ) = ∑ j = 0 n − 1 a j x j x = W ( k , N ) N(x)=\sum_{j=0}^{n-1}a_jx_jx=W(k,N) N(x)=j=0n1ajxjx=W(k,N)

    分解后有:

    N 1 ( x ) = a 0 + a 2 x + a 4 x 2 + . . . + a ( n − 2 ) ∗ x ( n / 2 − 1 ) . . . 1 N_1(x)=a_0+a_2x+a_4x^2+...+a(n-2)*x^{(n/2-1)}...1 N1(x)=a0+a2x+a4x2+...+a(n2)x(n/21)...1
    N 1 ( x ) = a 1 + a 3 x + a 5 x 2 + . . . + a ( n − 1 ) ∗ x ( n / 2 − 1 ) . . . 2 N_1(x)=a_1+a_3x+a_5x^2+...+a(n-1)*x^{(n/2-1)}...2 N1(x)=a1+a3x+a5x2+...+a(n1)x(n/21)...2
    N ( x ) = N 1 ( x 2 ) + N 2 ( x 2 ) . . . 3 N(x)=N_1(x^2)+N_2(x^2)...3 N(x)=N1(x2)+N2(x2)...3

    就这样将 ( W ( 0 , N ) ) 2 , ( W ( 1 , N ) ) 2 , . . . , ( W ( N − 1 , N ) ) 2 (W(0,N))^2,(W(1,N))^2,...,(W(N-1,N))^2 (W(0,N))2,(W(1,N))2,...,(W(N1,N))2一个一个往1,2式代,然后根据3式组合就可以得到结果了。

    按照相同的办法,将此式递归下去,最终就可以得到结果。这就是库利-图基算法的大概思想。

    语言描述起来很抽象,使用蝴蝶网络来表示,就可以一目了然:

    在这里插入图片描述
    在这里插入图片描述
    如图,展示了一个N=8的快速傅里叶变换执行流程。

    按照这种算法,可以很快写出递归算法类C伪代码:

    Array recursive_fft(a)
    {
        int n =  a.lengh();
        if(n=1)
            return a;
        W wn = e^2*PI*i/n;
        W w = 1;
        Array a0 = {a0,a2,a3……a(n-2)};
        Array a1 = {a1,a3,a5……a(n-1)};
        Array y0 = recursive(a[0]);
        Array y1 = recursive(a[1]);
        Array y;
        for(int k = 0;k<=n/2-1;++k)
        {
            y[k] = y0[k]+wy1[k];
            y[k+n/2] = y0[k]-wy1[0];
            w *= wn
        }
        return y;
    }
    

    然而,FFT在实际应用中往往需求很高的效率,虽然都是O(n*logn)d的时间复杂度,但是在递归的实现算法中,隐含的常数更大(这点可以参考《算法导论》的相关内容,这里不做具体讨论)。并且,递归的方案也不好用GPU实现。
    所以需要实现迭代的FFT,参考算法导论相代码实现如下:

    #pragma once
    #include "RBVector2"
    
    class RBFFTTools
    {
    public:
        static void iterative_fft(RBVector2 *v,RBVector2 *out,int len)
        {
            bit_reverse_copy(v,out,len);
            int n = len;
            for (int s = 1; s <= log(n)/log(2);s++)
            {
                int  m = pow(2, s);
                RBVector2 wm(cos(2 * PI / m), sin(2 * PI / m));
                for (int k = 0; k < n ;k+=m)
                {
                    RBVector2 w = RBVector2(1,0);
                    for (int j = 0; j < m / 2;++j)
                    {
                        RBVector2 vv = out[k + j + m / 2];
                        RBVector2 t = RBVector2(w.x*vv.x - w.y*vv.y,w.x*vv.y+w.y*vv.x);
                        RBVector2 u = out[k + j];
                        out[k + j] = RBVector2(u.x+t.x,u.y+t.y);
                        out[k + j + m / 2] = RBVector2(u.x-t.x,u.y-t.y);
                        w = RBVector2(w.x*wm.x - w.y*wm.y, w.x*wm.y + w.y*wm.x);
                    }
                }
            }
        }
        //逆变换
        static void iterative_ifft(RBVector2 *v, RBVector2 *out, int len)
        {
            bit_reverse_copy(v, out, len);
            int n = len;
            for (int s = 1; s <= log(n)/log(2); s++)
            {
                int  m = pow(2, s);
                RBVector2 wm(cos(2 * PI / m), sin(2 * PI / m));
                for (int k = 0; k < n; k += m)
                {
                    RBVector2 w = RBVector2(1, 0);
                    for (int j = 0; j < m / 2; ++j)
                    {
                        RBVector2 vv = out[k + j + m / 2];
                        RBVector2 t = RBVector2(w.x*vv.x - w.y*vv.y, w.x*vv.y + w.y*vv.x);
                        RBVector2 u = out[k + j];
                        out[k + j] = RBVector2(u.x + t.x, u.y + t.y);
                        out[k + j + m / 2] = RBVector2(u.x - t.x, u.y - t.y);
                        w = RBVector2(w.x*wm.x + w.y*wm.y, -w.x*wm.y + w.y*wm.x);
                    }
                }
            }
    
            for (int i = 0; i < n;++i)
            {
                out[i].x /= n;
                out[i].y /= n;
            }
    
        }
    private:
        static void bit_reverse_copy(RBVector2 src[], RBVector2 des[], int len)
        {
            for (int i = 0; i < len;i++)
            {
                des[bit_rev(i, len-1)] = src[i];
            }
        }
    
    public:
        static unsigned int bit_rev(unsigned int v, unsigned int maxv)
        {
            unsigned int t = log(maxv + 1)/log(2);
            unsigned int ret = 0;
            unsigned int s = 0x80000000>>(31);
            for (unsigned int i = 0; i < t; ++i)
            {
                unsigned int r = v&(s << i);
                ret |= (r << (t-i-1)) >> (i);    
            }
            return ret;
        }
    };
    

    上面的代码出现了两个函数:bit_reverse_copy()和bit_rev(),这两个函数用于将初始数据按照bit反转顺序进行排序。

    bit反转看下面例子就可了然:

    正序逆序
    000000
    001100
    010010
    011110
    100001
    101101
    110011
    111111

    即把所有bit位冲头到尾颠倒一次。

    上面iterative_ifft()是逆变换,逆变换很简单就可以修改出来,只需要根据W的性质进行修改W的更新,以及除以一个N就可以了。

    运行结果如下:
    在这里插入图片描述

    算法导论 第30章

    基于GPU的FFT

    这里使用OpenGL的Fragment Shader来实现FFT,我使用了两个浮点纹理作为输入,一个纹理表示蝴蝶网络,一个表示当前的输入数据:

    一个N(2的幂)的FFT需要迭代 l o g 2 ( N ) log_2(N) log2(N)次,蝴蝶网络和输入数据每次迭代都会更新,更新的数据用于下次迭代输入,输出的数据使用FBO来暂存,效率还可以。

    蝴蝶网络的单元结构如下:

    在这里插入图片描述
    输入数据仅用了四个通道中的前两个,用来存放数据的实部和虚部。

    GPU算法可以通过蝴蝶网络图很容易的得出:

    在这里插入图片描述
    观察此图,可以发现:

    每次迭代都需要输出数据,输出的数据由下图计算方案决定:

    在这里插入图片描述
    W ( r , N ) W(r,N) W(r,N)就储存在对应的蝴蝶网络纹理中,取出来就可以了。

    另外,每次迭代后都要对于蝴蝶网络更新,更新的内容包括:

    更新W和更新对应项的索引x,y值,由上图可以知道, W W W只需下标乘以2得到 N n e w N_{new} Nnew,上标依次从零到 N n e w N_new Nnew,以此2循环即可。因为转换一下就可以知道,W的变换其实是下面这个规律:

    W(0,2) W(0,4) W(0,8) W(0,16) ……
    W(1,2) W(1,4) W(1,8) W(1,16) ……
    W(2,4) W(2,8) W(2,16) ……
           W(3,4) W(3,8) W(3,16) ……
           W(4,8) W(4,16) ……
           W(5,8) .................
           W(6,8) .................
           W(7,8) .................
           .................
    

    而索引的变换就更加简单了,加上当前的迭代次数即可。

    思路很简单,但是调试的时候很蛋疼,给出的代码使用了OpenFrameworks提供的OpenGL框架:

    #include "ofApp.h"
    #include "time.h"
    
    time_t t_s1,t_e1;
    time_t t1,t2;
    
    void ofApp::setup()
    {
        ofDisableArbTex();
        imag_test.loadImage("1.jpg");
        
        first = true;
        N = 1024;
        cur = 0;
        
        source = new RBVector2[N*N];
        rev_source = new RBVector2[N*N];
        out = new RBVector2[N*N];
    
        //禁用Alpha混合,否则绘制到FBO会混合Alpha,造成数据丢失
        ofDisableAlphaBlending();
        
        
        for(int i=0;i<2;i++)
        {
            weight_index[i].allocate(N,N,GL_RGBA32F);
            res_back[i].allocate(N,N,GL_RGBA32F);
            data[i].allocate(N,N,ofImageType::OF_IMAGE_COLOR_ALPHA);
        }
        //初始化蝴蝶网络
        for(int i=0;i<N;i++)
        {
            for(int j=0;j<N;j++)
            {
                if(j%2==0)
                    data[0].setColor(j,i,ofFloatColor(0.f,2.f,(float)((i*N+j+1)%N),(float)((i*N+j+1)/N)));
                else
                    data[0].setColor(j,i,ofFloatColor(1.f,2.f,(float)((i*N+j-1)%N),(float)((i*N+j-1)/N)));
            }
        }
        //初始化原始数据,这里使用随机数
        for(int i=0;i<N*N;i++)
        {
            float s1 = ofRandomf();
            float s2 = ofRandomf();
            source[i] = RBVector2(s1,s2);
        }
        t_s1 = time(0);
        bit_reverse_copy(source,rev_source,N*N);
        t_e1= time(0);
        t2 = t_e1 - t_s1;
        //初始化到纹理数据
        for(int i=0;i<N;i++)
        {
            for(int j=0;j<N;j++)
            {
                data[1].setColor(j,i,ofFloatColor(rev_source[i*N+j].x,rev_source[i*N+j].y,0,1));
            }
        }
        
        data[0].update();
        data[1].update();
    
        weight_index[0].begin();
        data[0].draw(0,0,N,N);
        weight_index[0].end();
    
        res_back[0].begin();
        data[1].draw(0,0,N,N);
        res_back[0].end();
    
        ofSetWindowShape(1280,720);
        weight_index_shader.load("normalv.c","generate_weight_and_index_f.c");
        gpu_fft_shader.load("normalv.c","gpu_fft_f.c");
    
        t_s1 = time(0);
        //CPU变换
        RBFFTTools::iterative_fft(source,out,N*N);
        t_e1 = time(0);
    
        t1 = t_e1 - t_s1;
    }
    
    //--------------------------------------------------------------
    void ofApp::draw()
    {
        if(first)
        {
            t_s1 = time(0);
    
            for(int i = 1;i<N*N;i*=2)
            {
    
                res_back[1-cur%2].begin();
    
                //更新输出
                gpu_fft_shader.begin();
                gpu_fft_shader.setUniform1i("size",N);
                gpu_fft_shader.setUniform1i("n_step",i);
                gpu_fft_shader.setUniform3f("iResolution",N,N,0);
                gpu_fft_shader.setUniformTexture("tex_index_weight",weight_index[cur].getTextureReference(),1);
                gpu_fft_shader.setUniformTexture("tex_res_back",res_back[cur].getTextureReference(),2);
                gpu_fft_shader.setUniformTexture("test",imag_test.getTextureReference(),4);
                ofRect(0,0,N,N);
                gpu_fft_shader.end();
                res_back[1-cur%2].end();
            
                //更新蝴蝶网络数据
                weight_index[1-cur%2].begin();
                weight_index_shader.begin();
                weight_index_shader.setUniform1i("size",N);
                weight_index_shader.setUniform1i("n_step",i);
                weight_index_shader.setUniform3f("iResolution",N,N,0);
                weight_index_shader.setUniformTexture("tex_input",weight_index[cur].getTextureReference(),1);
                ofRect(0,0,N,N);
                weight_index_shader.end();
                weight_index[1-cur%2].end();
                
                cur = 1 - cur%2;
            }
            t_e1= time(0);
            t2 += (t_e1 - t_s1);
    
        cout<<"==================↓gpu================"<<endl;
        //输出GPU计算结果
        for(int i=0;i<N;i++)
        {
            for(int j=0;j<N;j++)
            {
                //cout<<s.getColor(j,i).r<<","<<s.getColor(j,i).g<<endl;
            }
        }
        cout<<"==================↓cpu================"<<endl;
        //输出CPU计算结果
        for(int i=0;i<N;i++)
        {
            for(int j=0;j<N;j++)
            {
                int index = i*N+j;
                //cout<<out[index].x<<","<<out[index].y<<endl;
            }
        }
        cout<<"===================================="<<endl;
        //对比CPU和GPU计算结果
        for(int i=0;i<N;i++)
        {
            for(int j=0;j<N;j++)
            {
                
                float rx = s.getColor(j,i).r;
                float ry = s.getColor(j,i).g;
                int index = i*N+j;
                float delta = 0.1;
                if(abs(rx-out[index].x)>delta||abs(ry-out[index].y)>delta)
                {
                    //数据在误差范围外,出错
                    goto wrong;
                    //cout<<s.getColor(j,i).r<<","<<s.getColor(j,i).g<<"|"<<out[index].x<<","<<out[index].y<<endl;
                }
                else
                {
                    //cout<<s.getColor(j,i).r<<","<<s.getColor(j,i).g<<","<<s.getColor(j,i).b<<","<<s.getColor(j,i).a<<endl;
                }
    
            }
        }
        cout<<"CPU:"<<t1<<endl<<"GPU:"<<t2<<endl;
                first = !first;
        }
    
        if(false)
        {
    wrong:
            printf("wrong!!!\n");
        }
        //绘制数据输入纹理和数据输出纹理
        res_back[cur].draw(N,0,N,N);
        data[1].draw(0,0,N,N);
    }
    

    更新数据Shader:

    #version 130
    uniform sampler2D tex_index_weight;
    uniform sampler2D tex_res_back;
    uniform sampler2D test;
    uniform int size;
    uniform int n_step;
    
    uniform vec3 iResolution;
    
    out vec4 outColor;
    
    void main()                                      
    {   
        vec2 tex_coord = gl_FragCoord.xy/iResolution.xy;
        
        float cur_x = gl_FragCoord.x - 0.5;
        float cur_y = gl_FragCoord.y - 0.5;
        
        vec2 outv;
            
        vec4 temp = texture(tex_index_weight,tex_coord);
         vec2 weight = vec2(cos(temp.r/temp.g*2*3.141592653),sin(temp.r/temp.g*2*3.141592653));
         vec2 _param2_index = temp.ba;
        
         vec2 temp_param2_index = _param2_index;
        temp_param2_index.x += 0.5;
        temp_param2_index.y += 0.5;
        vec2 param2_index = temp_param2_index/iResolution.xy;
        
         vec2 param1 = texture(tex_res_back,tex_coord).rg;
         vec2 param2 = texture(tex_res_back,param2_index).rg;
        
        int tex_coord_n1 = int((cur_y*size)+cur_x);
        int tex_coord_n2 = int((_param2_index.y*size)+_param2_index.x);
        
        if(tex_coord_n1<tex_coord_n2)
        {
            outv.r = param1.r + param2.r*weight.r-weight.g*param2.g;
            outv.g = param1.g +weight.r*param2.g + weight.g*param2.r;
        }
        else
        {
            outv.r = param2.r + param1.r*weight.r-weight.g*param1.g;
            outv.g = param2.g +weight.r*param1.g + weight.g*param1.r;
        }
        
        outColor = vec4(outv,0,1);
        
    }
    

    更新蝴蝶网络Shader:

    #version 130
    
    uniform sampler2D tex_input;
    uniform int size;
    uniform int n_total;
    uniform int n_step;
    
    in vec3 normal;
    uniform vec3 iResolution;
    out vec4 outColor;
    
    void main()                                      
    {      
        vec2 tex_coord = gl_FragCoord.xy/iResolution.xy;
            float cur_x = gl_FragCoord.x - 0.5;
        float cur_y = gl_FragCoord.y - 0.5;
        
        vec4 outv;
        int tex_coord_n = int((cur_y*size)+cur_x);
        
        //updata weight
        vec2 pre_w = texture(tex_input,tex_coord).rg;
        float i = pre_w.r;
        float n = pre_w.g;
        float new_i;
        float new_n;
        new_i = i;
        new_n = n*2;
        if(tex_coord_n%(n_step*4) > n_step*2-1)
        {
            new_i += n_step*2;
        }
        outv.r = new_i;
        outv.g = new_n;
        
        //updata index
        vec2 pre_index = texture(tex_input,tex_coord).ba;
        int x = int(pre_index.x);
        int y = int(pre_index.y);
        int ni = n_step*2;
        int new_tex_coord_n = tex_coord_n;
        if((tex_coord_n/ni)%2==0)
        {
            new_tex_coord_n += ni;
        }
        else
        {
            new_tex_coord_n -= ni;
        }
        outv.b = new_tex_coord_n%size;
        outv.a = new_tex_coord_n/size;
        
        outColor = outv;
    }
    

    用time大概对比了一下GPU和CPU的计算时间,结果如下:

    在这里插入图片描述
    N = 512*512的情况下,误差在0.1以内,在我的应用情况下是可以接受的,更高的数据意味着更多的迭代次数,会进一步降低数据精度,目前还没有解决误差问题。

    运行50次,大概对比如下,不是很准确,仅供参考:

    在这里插入图片描述

    展开全文
  • FFT快速卷积GPU加速----pycuda

    千次阅读 2017-09-15 10:40:41
    二维卷积定理的实现1....3.分别对f,h做傅里叶变换(FFT2) 4.G=F*H(注意是乘法不是点乘) 5.对G做IFFT2 6.对5中的结果进行剪切得到原大小的图片。CPU下numpy实现直接看代码,其中填充是手动填充的。#-*- codin

    二维卷积定理的实现

    1.读取一幅图f(x,y)大小A*B ,一个卷积核h(x,y)大小为C*D。
    2.对f,h,进行填充,填充成Q*P,P>=A+C-1,Q>=B+D-1。(一定要进行填充解决做TTF时由周期性引起的缠绕问题的数据错误及数据丢失)
    3.分别对f,h做傅里叶变换(FFT2)
    4.G=F*H(注意是乘法不是点乘)
    5.对G做IFFT2
    6.对5中的结果进行剪切得到原大小的图片。

    直接看代码吧!

    #-*- coding: utf-8 -*-
    import pycuda.autoinit
    import pycuda.driver as drv
    from pycuda.compiler import SourceModule
    from jinja2 import Template
    from reikna.fft import FFT
    import reikna.cluda as cluda
    import numpy as np
    import os
    import matplotlib.pyplot as plt
    import pylab
    from scipy import signal
    import time
    
    
    
    # ==========================================================
    def Paddedsize(arr,A,B,P,Q):
        pad = np.empty(P*Q).reshape(P,Q)
        pad = np.complex128(pad)
        i = 0
        j = 0
        for i in range(P):
            for j in range(Q):
                if (i<=A-1)and(j<=B-1):
                    pad[i][j] = arr[i][j]
                else:
                    pad[i][j] = 0
    
        return pad
    # ==========================================================
    def reikna_fft(arr, P, Q):
        api = cluda.cuda_api()
        thr = api.Thread.create()    
        x = thr.to_device(arr)
        X = thr.array((P,Q), dtype=np.complex128)
        fft = FFT(x)
        fftc = fft.compile(thr)
        fftc(X, x, 0)
        fft = X.get()
        thr.release()
        return fft
    # ==========================================================
    def test_convolution_cuda():
        path = os.getcwd()
        img = plt.imread(path + "/rose.tif")
        plt.imshow(img)
        pylab.show()
        mats_1 = np.complex128(img)
        mats_2 = np.array((
                             [[1,0,-1], 
                              [2,0,-2],
                              [1,0,-1] 
                          ]), dtype=np.complex128)
    
        R1 = mats_1.shape[0]
        R2 = mats_2.shape[0]
        L1 = mats_1.shape[1]
        L2 = mats_2.shape[1]
        P = R1 + R2 
        Q = L1 + L2
    
        start = time.time()
        mats_1 = Paddedsize(mats_1, R1, L1, P, Q)
        mats_2 = Paddedsize(mats_2, R2, L2, P, Q)
        end = time.time()
        print (end- start)
    
        start = time.time()
        xfft1 = reikna_fft(mats_1,P,Q)
        xfft2 = reikna_fft(mats_2,P,Q)
        end = time.time()
        print (end- start)
    
        start = time.time()
        result =  xfft1*xfft2
        end = time.time()
        print (end- start)
    
        aa = result.conjugate() 
        xifft = reikna_fft(aa,P,Q)
        xifft = xifft/(P*Q)
        out = xifft.real
        out = out[1:R1+1,1:L1+1]   
        plt.imshow(out)
        pylab.show()
    
    # ==========================================================
    if __name__ == '__main__':
        start = time.time()
        test_convolution_cuda()
        end = time.time()
        print (end- start)
    # ==========================================================

    这里的知识点我在前几个帖子里介绍过,有兴趣的可以看一看瞧一瞧。
    看看效果图:
    原图:
    这里写图片描述
    卷积之后的图:
    这里写图片描述

    总结

    从效果上来看,用reikna做的卷积还是不错的,但是从时间上来看是失败的,就上面的程序耗时间需要5S。具体就是reikna的TTF耗时比较久,看来还是需要自己写FFT程序,不能偷懒。
    有大神看到希望给出点建议,谢谢!

    展开全文
  • FFT-GPU-Accel Fast Fourier Transform Acceleration Algorithm. (Accelerated by CUDA) 简要介绍 基于FFT的蝶形公式,利用GPU的多核心优势,结合蝶形公式算法中同一层级的运算因子互不干扰的特点,对算法进行了...
  • NUFFT的CPU直接傅里叶变换(以建立NUFFT实现的正确性) 使用CUFFT FFT进行CPU网格划分时的CPU NUFFT(测试CUFFT代码) 针对GPU NAIVE的CPU NUFFT(请参阅以下部分。基准GPU算法) 针对GPU SHMEM的GPU NAIVE(仅在...
  • FFT的并行实现

    千次阅读 2018-12-21 11:06:23
    关键词:快速傅里叶变换 高维FFT 并行计算 快速傅里叶变换简介 离散傅里叶变换 离散傅里叶变换(DFT)一般定义为: Fn≡∑k=0N−1fke−2πink/N F_n \equiv \sum_{k=0}^{N-1}f_ke^{-2\pi ink/N}Fn​≡k=0∑N−1​fk...

    关键词:快速傅里叶变换 高维FFT 并行计算

    快速傅里叶变换简介

    离散傅里叶变换

    离散傅里叶变换(DFT)一般定义为: F n ≡ ∑ k = 0 N − 1 f k e − 2 π i n k / N F_n \equiv \sum_{k=0}^{N-1}f_ke^{-2\pi ink/N} Fnk=0N1fke2πink/N
    离散傅里叶逆变换可以定义为: f k = 1 N ∑ k = 0 N − 1 F n e − 2 π i n k / N f_k = \frac{1}{N} \sum_{k=0}^{N-1}F_ne^{-2\pi ink/N} fk=N1k=0N1Fne2πink/N
    这里 e e e头上的符号不管放在正变换那还是负变换那里都是没有关系的。正负变换前面的系数是什么也都不打紧,只要乘积为 1 N \frac{1}{N} N1即可。

    按矩阵呈向量的观点来看,简单地说,DFT就是把一个 N N N长的向量,通过乘以一个 M × N M \times N M×N的矩阵,变成 M M M长的向量(一般情况下 M = N M=N M=N)。重要的是,这个变换矩阵是什么呢?其实它是一个范德蒙德矩阵,所以DFT其实就可以写为如图所示。

    在这里插入图片描述

    那么这个 ω 0 , ω 1 . . . ω M − 1 \omega_0,\omega_1...\omega_{M-1} ω0,ω1...ωM1是个什么玩意儿呢?它其实就是复单位圆周上的M个等分点位置。如图所示。

    在这里插入图片描述

    一言以蔽之,要将一个N长的向量变成M长的向量,将复平面的上单元圆分成M分,取第1个等分点,乘方出来N个数,和原来的N长向量做內积,得到傅里叶变换出来的第1个数,依次类推,每个等分点都能出来一个数,那么M个等分点就能出来M个数。

    我想到这,我应该把离散傅里叶变换将清楚了。非要用公式来表达的话,同前所写,不过我这里用了不同的符号,如下:

    u ^ k = ∑ j = 0 N − 1 e i 2 π k M ⋅ j ⋅ u j , j = 1 , 2 , . . . , N \hat u_k = \sum\limits_{j=0}^{N-1}{e^{i\frac{2\pi k}{M}\cdot j}\cdot u_j},j=1,2,...,N u^k=j=0N1eiM2πkjuj,j=1,2,...,N

    上面提到的都是一维的傅里叶变换,二维傅里叶变换其实就是在两个维度上分别做傅里叶变换。

    快速傅里叶变换

    在实际计算中,人们不直接采用上述离散傅里叶变换的定义式进行计算,而是使用快速傅里叶变换(Fast Fourier Transform),我们可以简单地先把快速傅里叶变换认为是一种做离散傅里叶变换的快速算法。

    快速傅里叶变换被誉为二十世纪最伟大的算法之一。所谓的快速傅里叶变换(Fast Fourier Transform,简称FFT),不过是求解离散傅里叶变换的一个快速算法。它将原本计算量为 O ( N 2 ) O(N^2) O(N2)的DFT求和公式,降为了 O ( N log N ) O(N \text{log} N) O(NlogN)
    但是,为了获得较高的运算速度,待变换序列的元素数量必须是 2 n 2^n 2n形式个。

    由上述可知,所谓的DFT,不过是一个以输入向量为系数的多项式,在复单位圆周上取不同等分点得到的值。即寻求 ω i , i = 0 , 1 , 2 , . . . , n − 1 \omega_i,i=0,1,2,...,n-1 ωii=0,1,2,...,n1在如下多项式上的取值:
    p ( x ) = a 0 + a 1 ( x ) + ⋯ + a n − 1 x n − 1 p(x) = a_0 + a_1(x)+\dots+a_{n-1}x^{n-1} p(x)=a0+a1(x)++an1xn1

    那么FFT的精髓就可以写为如下三步:

    • 将多项式按奇偶分为两部分, p ( x ) = p 2 ( x 2 ) + x p 1 ( x 2 ) p(x) = p_2(x^2)+xp_1(x^2) p(x)=p2(x2)+xp1(x2),这里
      p 2 ( x ) = a 0 + a 2 ( x ) + a 4 ( x 2 ) + … a n − 2 x n / 2 − 1 p 1 ( x ) = a 1 + a 3 x + a 5 x 2 + ⋯ + a n − 1 x n / 2 − 1 \begin{aligned} p_2(x) &amp;=&amp; a_0 + a_2(x) + a_4(x^2)+\dots a_{n-2}x^{n/2-1} \\ p_1(x) &amp;=&amp; a_1 + a_3x + a_5x^2+\dots + a_{n-1}x^{n/2-1} \end{aligned} p2(x)p1(x)==a0+a2(x)+a4(x2)+an2xn/21a1+a3x+a5x2++an1xn/21

    • 从上所述,那么,问题就变为了求点 ( ω 0 ) 2 , ( ω 1 ) 2 … ( ω n − 1 ) 2 (\omega_0)^2,(\omega_1)^2\dots(\omega_{n-1})^2 (ω0)2,(ω1)2(ωn1)2 p 1 ( x ) p_1(x) p1(x)
      p 2 ( x ) p_2(x) p2(x)上的取值。容易看到求 ( ω 0 ) 2 , ( ω 1 ) 2 … ( ω n / 2 − 1 ) 2 (\omega_0)^2,(\omega_1)^2\dots(\omega_{n/2-1})^2 (ω0)2,(ω1)2(ωn/21)2
      (即 ( ω n / 2 + 1 ) 2 , ( ω n / 2 + 2 ) 2 … ( ω n − 1 ) 2 (\omega_{n/2+1})^2,(\omega_{n/2+2})^2\dots(\omega_{n-1})^2 (ωn/2+1)2,(ωn/2+2)2(ωn1)2)是两个规模减半的傅里叶变换。求得规模减半的福利变换,根据上述的 p ( x ) p(x) p(x) p 1 ( x ) p_1(x) p1(x) p 2 ( x ) p_2(x) p2(x)的关系,可求得原来的傅里叶变换。

    • p 1 ( x ) , p 2 ( x ) p_1(x),p_2(x) p1(x),p2(x)的傅里叶变换,又可以拆成规模更小的傅里叶变换,如此层层向下,直到单值的傅里叶变换为其本身。

    如上所述,FFT是可以递归实现的,写成伪代码,如图所示。另外,在计算 p ( x ) p(x) p(x)的值时,可以利用到 e i θ = − e i ( π + θ ) e^{i\theta }= - e^{i(\pi+\theta)} eiθ=ei(π+θ)的性质,进一步减少重复的计算。如图[FFTS]所示。

    在这里插入图片描述

    在这里插入图片描述

    写一个简单的cpp脚本如下:

    #include <stdio.h>
    #include <stdlib.h>
    #define _USE_MATH_DEFINES//pi使用M_PI表示
    #include <math.h>
    
    /*fft的一个串行实现,因为c中没有复数(在有的标准下有,但是使用方法不尽相同,还不如自己写一个),
    所以我们需要定义复数及其运算,使用结构体。*/
    
    
    typedef struct Complex
    {
    	double r=0.0;//初始值设置为0
    	double i=0.0;//初始值设置为0
    } complex;
    
    
    /**
    函数,复数加和减
    **/
    void complex_add(complex* result, const complex *c1, const complex *c2)
    {
    	result->i = c1->i + c2->i;
    	result->r = c1->r + c2->r;
    }
    
    void complex_sub(complex* result, const complex *c1, const complex *c2)
    {
    	result->i = c1->i - c2->i;
    	result->r = c1->r - c2->r;
    }
    
    /**
    函数,复数乘
    **/
    void complex_multiply(complex* result, const complex *c1, const complex *c2)
    {
    	(result->i) = (c1->r)*(c2->i) + (c1->i)*(c2->r);
    	(result->r) = (c1->r)*(c2->r) - (c1->i)*(c2->i);
    }
    
    /**
    函数,复数赋值
    **/
    void complex_copy(complex* result, const complex *c1)
    {
    	result->r = c1->r;
    	result->i = c1->i;
    }
    
    /**
    定义一个求数组长度的运算,包括结构体数组, 尽量少用指针,容易出错
    **/
    
    int length(complex a[])
    {
    	int length = sizeof(a) / sizeof(a[0]);
    	return length;
    }
    
    
    void fft(complex a[],complex y[],int n)//因为不能返回数组,所以只能这么干
    {
    	//int n = length(a);
    //	printf("%f", length(a));
    //	printf("%f", a[2].r);
    //	getchar();
    	if (n == 1)
    	{
    		//A = a;//不知道这样写行不行,先放着
    		complex_copy(y, a);
    		return;
    	}
    	complex omega_n;
    //	printf("%f", M_PI);
    	omega_n.r = cos(2 * M_PI / n);
    	omega_n.i = sin(2 * M_PI / n);
    	complex omega;
    	omega.r = 1;
    	int half_n = int(n*0.5);
    	complex*  a1 = (complex*)malloc(sizeof(complex)*half_n);
    	complex*  a2 = (complex*)malloc(sizeof(complex)*half_n);
    
    	//complex a1[half_n];
    
    	for (int i = 0; i < half_n; i++)
    	{
    		complex_copy(&a2[i], &a[i * 2]);
    		complex_copy(&a1[i], &a[i * 2+1]);//分成奇数和偶数
    	//	a2(i) = a(i * 2);
    	//  a1(i) = a(i * 2 + 1);
    	}
    	//complex y2[1];
    	//complex y1[];
    	complex*  y1 = (complex*)malloc(sizeof(complex)*half_n);
    	complex*  y2 = (complex*)malloc(sizeof(complex)*half_n);
    	fft(a2,y2,int(n*0.5));
    	fft(a1,y1,int(n*0.5));
    //	double complex y[n];
    //	complex* const y = (complex*)malloc(sizeof(complex)*n);
    //	complex omega_n;
    //	complex omega;
    	for (int k = 0; k <= half_n - 1; k++)
    	{
    		complex t;
    //		t->r = 0;
    	//	t->i = 0;
    		complex_multiply(&t, &omega, &y1[k]);
    	//	complex *t = complex_multiply(&omega, &y1(k));
    
    		complex_add(&y[k], &y2[k],&t);
    	//	y(k) = y2(k) + t;
    		complex_sub(&y[k + half_n], &y2[k], &t);
    	//	y(k + half_n) = y2(k) - t;
    	//	omega = omega*omega_n;
    		complex omega_temp = omega;
    		complex_multiply(&omega, &omega_temp, &omega_n);
    	}
    	//return y;
    
    }
    
    
    
    
    void main()
    {
    	complex a[4];
    	complex A[4];
    	a[0].r = 1;
    	a[1].r = 2;
    	a[2].r = 3;
    	a[3].r = 4;
    	fft(a,A,4);
    	printf("result is\n");
    	for (int i = 0; i < 4; i++)
    	{
    		printf("%f+%fi\n", A[i].r, A[i].i);
    	}
    	//for (int i = 0; i < length(a); i++)
    	//{
    	//	printf('test');
    	//}
    	// c(1,1);
    //	_C_double_complex a;
    //	a = 2I;
    //	double m = exp(0.1*I);
    //	printf("%f", m);
    	getchar();
    
    
    }
    
    

    如果写成matlab的脚本为:

    function y = myfft( a )
    %UNTITLED2 此处显示有关此函数的摘要
    %   此处显示详细说明
    n = length(a);
    if n==1
        y = a;
        return;
    end
    wn = exp(2*pi*i/n);
    w = 1;
    a2 = a([1:2:n]);
    a1 = a([2:2:n]);
    y2 = myfft(a2);
    y1 = myfft(a1);
    for k=1:n/2
        t = w*y1(k);
        y(k)=y2(k)+t;
        y(k+n/2) = y2(k)-t;
        w = w*wn;
    end
    end
    
    
    

    上述为傅里叶变换的递归实现,为了实现并行,我们其实可以将其写为一种迭代(循环)实现。什么叫迭代实现呢?考虑上述递归的FFT,以8点DFT为例,其递归划分过程如图所示。这里因不是本文重点,故不再细述。

    在这里插入图片描述

    要对序列[0 7]{}做一个DFT,根据FFT原理,只需对[0,2,4,6]{}以及[1,3,5,7]{}做DFT。继续划分,则需对[0,4]{}[2,6]{}[1,5]{}[3,7]{}做DFT。最后是对0,4,2,6,1,5,3,7分别作单点DFT。所以,只要我们知道最底层的数字的排序方式,完全可以逐层从下到上开始计算,知道第一层,不需要递归。事实上,我们可以发现,源序列04261537的二进制数分别是000,100, 010, 110, 001, 101, 011, 111,每个数倒置一下得到000, 001, 010, 011,100, 101, 110,111,正好就是01234567。对于其他2的幂也是成立的。那么,就可据此将递归步,通过几层循环来实现,有人将以此设计出来的非递归算法称为“蝶形”(butterfly)迭代算法。如图所示。

    在这里插入图片描述

    离散傅里叶变换的并行分解

    并行化策略

    我们可以将规模为 N N N的傅里叶变换分解成相互独立的规模更小的傅里叶变换以便于并行实现,分解也使得内存管理变得更加灵活。一个简单的想法就是将输入向量看成一个二维的向量( m × M m \times M m×M),这里, N , m , M N,m,M N,m,M都是2的指数次方量级的。那么, f f f的各个成分就可以表示为: f [ J m + j ] , 0 ≤ j &lt; m , 0 ≤ J &lt; M f[Jm+j],0\leq j&lt;m, 0\leq J&lt;M f[Jm+j],0j<m,0J<M
    可以看到,这里的 j j j指标变得更快,而 J J J指标变得更慢。用中括号括起来的是向量的下标,下标从 0 0 0开始。

    那么,原来的离散傅里叶变换就可以写为: ( l a b e l 2.6 ) F [ k M + K ] = ∑ j , J e 2 π ( k M + K ) i M m ( J m + j ) f ( J m + j ) , 0 ≤ k &lt; m , 0 ≤ K &lt; M (label{2.6}) F[kM+K] = \sum_{j,J} e^{\frac{2\pi(kM+K)i}{Mm}(Jm+j)}f(Jm+j),0\leq k&lt;m,0\leq K&lt;M label2.6F[kM+K]=j,JeMm2π(kM+K)i(Jm+j)f(Jm+j),0k<m,0K<M
    这里的 k k k K K K决定了结果的下标。 F F F K K K变化得更快一些。从方程([2.6])中,我们可以做一个恒等的变换,得到:

    ( l a b e l 2.7 ) F [ k M + K ] = ∑ j { e 2 π i j k / m [ e 2 π i j K / ( M m ) ( ∑ J e 2 π i J K / M f ( J m + j ) ) ] } (label{2.7}) F[kM+K]=\sum_{j}\{e^{2\pi ijk/m} [e^{2\pi ijK/(Mm)} (\sum_{J}e^{2\pi iJK/M}f(Jm+j)) ]\} (label2.7)F[kM+K]=j{e2πijk/m[e2πijK/(Mm)(Je2πiJK/Mf(Jm+j))]}

    从这里发现,解读这个分解,我们们可以得到如下的并行实现的步骤:

    • 固定每个 j j j,从每一个规模为 m m m的小块中,都可以取出一个值,形成一个向量。

    • 我们可以对上述的每一个向量做FFT。当然,做FFT之前的 J J J指标,就变成了做FFT
      之后的 K K K 指标。

    • 给每个组份乘上一个 e 2 π i j K / ( M m ) e^{2\pi ijK/(Mm)} e2πijK/(Mm)

    • 重排数据,使得它们是一组中是以 j ( 0 ≤ j &lt; m ) j(0\leq j&lt;m) j(0j<m)为指标变动的向量。

    • 对这些向量组在并行的条件下做FFT。那么这时候 j j j指标就变成了 k k k指标。

    • 此时就可得到结果以 F [ k M + K ] F[kM+K] F[kM+K]的形式呈现,再做一个反操作,就可以得到我们想要的正确顺序(就是 k k k
      变动最快那个顺序))的一个结果。

    通过上述,我们虽然把一个FFT操作拆成了两个FFT操作,但是对于每个FFT的计算量上的一个计算方式是不变的:第一步FFT的计算量为 N log M N \text{log} M NlogM,第二部FFT的计算量为 N log m N \text{log} m Nlogm,所以呢,总的计算量为 N log ( M n ) = N log N N \text{log}(Mn)=N \text{log} N Nlog(Mn)=NlogN是没变的。 上述的分解,被称为“zoom transforms”,而上述的FFT的并行方式,在论文中,被叫做是“six-step framework”。

    需要注意的话,我这里做的算法有一些假设条件,比如说要求数据长度除以处理机个数的值为2的正整数次方。这样就要求处理的问题规模必须是偶数,最好是2指数次方。

    写成的串行c代码如下:

    #include<stdio.h>
    #include<math.h>
    #include<time.h>
    #include<stdlib.h>
    
    #define N 8192
    
    typedef struct {
    	double real;
    	double imag;
    } Complex;
    
    
    Complex Input[N];
    Complex Result[N];
    Complex Euler[N/2];
    
    void computeEulers()
    {
    	int x = 0;
    	float theta;
    	float n = (4.0*M_PI)/N;
    
    	for(x = 0; x < (N>>1); x++){
    		theta = x*n;
    		Euler[x].real = cos(theta);
    		Euler[x].imag = -sin(theta);
    	}
    }
    
    Complex multiply(Complex* a, Complex* b)
    {
    	Complex c;
    	c.real = (a->real * b->real) - (a->imag * b->imag);
    	c.imag = (a->real * b->imag) + (b->real * a->imag);
    	return c;
    }
    
    Complex add(Complex* a, Complex* b)
    {
    	Complex c;
    	c.real = a->real+b->real;
    	c.imag = a->imag+b->imag;
    	return c;
    }
    
    int main(int argc, char** argv)
    {
    	
    
    //***************************************************************************
    //输入向量赋值,也可以从文件读入 
    //***************************************************************************
        int NN = N;	
    	NN = N;int i;
    	for(i=0;i<NN;i++)
    	{
    		Input[i].real = i+1; Input[i].imag = i+1;
    	}
    	
    
    	int n, k;
    	for(n = 8; n < N; n++){
    		Input[n].real = Input[n].imag = 0.0;
    	}
    
    	Complex even, odd;
    	struct timespec now, tmstart;
    	clock_gettime(CLOCK_REALTIME, &tmstart);
    
    	computeEulers();
    
    	Complex twiddle, temp, euler;
    
    	double theta, PI2_by_N = 2*M_PI/N;
    
    	int diff, idx;
    
    	for(k = 0; k < (N>>1); k++)
    	{
    		even.real = even.imag = odd.real = odd.imag = 0.0;
    
    		diff = (k - 1 +(N>>1)) % (N>>1);
    		idx = 0; 
    
    		for(n = 0; n < (N>>1); n++){
    			euler = Euler[idx];
    			temp = multiply(&Input[n<<1], &euler);         
    			even = add(&even, &temp);
    			temp = multiply(&Input[(n<<1) +1], &euler);  
    			odd = add(&odd, &temp);
    			idx = (idx + diff + 1) % (N>>1);
    		}
    
    		theta = k*PI2_by_N;
    		twiddle.real = cos(theta);
    		twiddle.imag = -sin(theta);
    		temp = multiply(&odd, &twiddle);
    		Result[k] = add(&even, &temp);
    		temp.real = -temp.real;
    		temp.imag = -temp.imag;
    		Result[k+(N>>1)] = add(&even, &temp);
    	}
    
    	clock_gettime(CLOCK_REALTIME, &now);
    	double seconds = (double)((now.tv_sec+now.tv_nsec*1e-9) - (double)(tmstart.tv_sec+tmstart.tv_nsec*1e-9));
    /*	printf("C time: %f seconds\n", seconds);
    
    	printf("TOTAL PROCESSED SAMPLES: %d\n", N);
    	printf("============================================\n");
    	for(k = 0; k <= 10; k++){
    		printf("XR[%d]: %.5f  \t XI[%d]: %.5f\n", k, Result[k].real, k, Result[k].imag);
    		printf("============================================\n");
    	}*/
    	
    	printf("PROCESS NUMBER : %d\n\n", N);
    	printf("\n\n\nTIME COST EVALUATED BY C: %e secs\n", seconds);
    //	printf("TIME COST EVALUATED BY MPI: %e secs\n\n\n", mpiend-mpist);
    		
    	freopen("outputS", "w", stdout);
    	
    	for(k = 0; k <= NN-1; k++){
    			//printf("XR[%d]: %.5f  \t XI[%d]: %.5f\n", k, ResultR[k], k, ResultI[k]);	
    		printf("X[%d]: %.5f + %.5f i\n", k, Result[k], Result[k]);		
    
    
    }
    }
    
    

    并行化脚本如下:

    //**************************************************************
    // 一维FFT的MPI并行程序,采用蝶形迭代算法 
    //***************************************************************
    // 输入的数据的长度(偶数)和所用进程数的比值一定要是2的正指数次方 
    // 如 W/p = 2,4,8... 
    // 编译: mpicc FFTParallel.c -lm -o FFTParallel
    // 运行:mpiexec mpirun qsub 
    // 陆嵩 
    // 最后一次修改时间:2018.12.20 15:08 
    //*****************************************************************
    
    #include<stdio.h>
    #include<math.h>
    #include<time.h>
    #include<mpi.h>
    #include<stdlib.h>
    #define N 8
    
    
    
    //***************************************************************************
    //定义一个复数的结构 
    //***************************************************************************
    typedef struct {
    	double real;
    	double imag;
    } Complex;
    
    //***************************************************************************
    //定义一个从文件读取数据的函数 ,没写 
    //***************************************************************************
    /*
    bool readFromFile()
    {
    	return true;
    }
    
    */
    
    //***************************************************************************
    //定义复数的乘法运算 
    //参输入数: a、b两个指向复数结构体的指针 
    //返回c,为复数结构体 
    //***************************************************************************
    Complex multiply(Complex* a, Complex* b)
    {
    	Complex c;
    	c.real = (a->real * b->real) - (a->imag * b->imag);
    	c.imag = (a->real * b->imag) + (b->real * a->imag);
    	return c;
    }
    
    //***************************************************************************
    //定义复数的加法 
    //参输入数: a、b两个指向复数结构体的指针 
    //返回c,为复数结构体 
    //***************************************************************************
    Complex add(Complex* a, Complex* b)
    {
    	Complex c;
    	c.real = a->real+b->real;
    	c.imag = a->imag+b->imag;
    	return c;
    }
    
    //***************************************************************************
    //主函数入口 
    //***************************************************************************
    int main(int argc, char** argv)
    {
    	int size, rank;int NN,i;int k;int n;
    	Complex Input[N];
    	MPI_Init(NULL, NULL);//初始化 
    	MPI_Comm_size(MPI_COMM_WORLD, &size);//获取进程数 
    	MPI_Comm_rank(MPI_COMM_WORLD, &rank);//获取进程编号 
    	int localN = (N/2)/size;//每个节点分配的数目 
    	double EulerR[N/2], EulerI[N/2];
    	double *ResultR = NULL, *ResultI = NULL;
    	double tempEulerI[localN], tempEulerR[localN];
    	double tempResultR[localN*2], tempResultI[localN*2];	
    	struct timespec now, tmstart;//定义时间结构体,struct timespec精确到微秒级别 
    	double mpist, mpiend;
    	
    //***************************************************************************
    //输入向量赋值,也可以从文件读入 
    //***************************************************************************	
    	NN = N;
    	for(i=0;i<NN;i++)
    	{
    		Input[i].real = i+1; Input[i].imag = i+1;
    	}
    	
    /*	for(k = 0; k <= NN-1; k++)
    	{
    			printf("%.5f + %.5f i\n", Input[k].real,Input[k].imag);
    	}*/
    
    	
    	/*for(n = 8; n < N; n++){
    		Input[n].real = Input[n].imag = 0.0;
    	}//剩下的值用0填充 */
    	
    	
    
    	if(rank==0)
    	{
    		ResultR = malloc(sizeof(double)*N);//进程0开辟一段地址来存结果 
    		ResultI = malloc(sizeof(double)*N);
    		clock_gettime(CLOCK_REALTIME, &tmstart);//取时间 ///计时起点 
    		mpist = MPI_Wtime();//MPI 计时函数 //
    	}
    
    //***************************************************************************
    //计算欧拉数,即复单位圆周上的等分点值,安装不同进程计算,然后再Allgahter一起 
    //***************************************************************************
    	double theta, ang = 4.0*M_PI/N;//计算一般点数的欧拉值 
    	int x;
    	for(x = 0; x < localN; x++){
    		theta = (localN*rank + x)*ang;//利用进程号计算欧拉值 
    		tempEulerR[x] = cos(theta);
    		tempEulerI[x] = -sin(theta);
    	}
    
    	//做一个全收集,使得每个进程上都有欧拉值 ,因为Allgather的机制,不会乱序 
    	MPI_Allgather(tempEulerR, localN, MPI_DOUBLE, EulerR, localN, MPI_DOUBLE, MPI_COMM_WORLD);
    	MPI_Allgather(tempEulerI, localN, MPI_DOUBLE, EulerI, localN, MPI_DOUBLE, MPI_COMM_WORLD);
    
    	Complex even, odd;
    
    	Complex twiddle, temp, euler, result;
    
    	double PI2_by_N = 2*M_PI/N;
    
    	int diff, idx;
    
    	for(x = 0; x < localN; x++)
    	{
    		k = rank*localN + x;//全局编号取出 
    		even.real = even.imag = odd.real = odd.imag = 0.0;//初始化为0 
    		diff = (k - 1 +(N>>1)) % (N>>1);
    		idx = 0; 
    
    		for(n = 0; n < (N>>1); n++){
    			euler.real = EulerR[idx];
    			euler.imag = EulerI[idx];
    			temp = multiply(&Input[n<<1], &euler);         
    			even = add(&even, &temp);
    			temp = multiply(&Input[(n<<1) +1], &euler);  
    			odd = add(&odd, &temp);
    			idx = (idx + diff + 1) % (N>>1);
    		}
    
    		theta = k*PI2_by_N;
    		twiddle.real = cos(theta);
    		twiddle.imag = -sin(theta);
    		temp = multiply(&odd, &twiddle);
    
    		result = add(&even, &temp);
    		tempResultR[x] = result.real;
    		tempResultI[x] = result.imag;
    
    		temp.real = -temp.real;
    		temp.imag = -temp.imag;
    		result = add(&even, &temp);
    		tempResultR[x+localN] = result.real;
    		tempResultI[x+localN] = result.imag;
    	}
    
    	
    	//通过收集,把结果都收集都进程0上面来 
    	MPI_Gather(tempResultR, localN, MPI_DOUBLE, ResultR, localN, MPI_DOUBLE, 0, MPI_COMM_WORLD);
    	MPI_Gather(tempResultI, localN, MPI_DOUBLE, ResultI, localN, MPI_DOUBLE, 0, MPI_COMM_WORLD);
    	
    	MPI_Gather(tempResultR+localN, localN, MPI_DOUBLE, ResultR+(N/2), localN, MPI_DOUBLE, 0, MPI_COMM_WORLD);
    	MPI_Gather(tempResultI+localN, localN, MPI_DOUBLE, ResultI+(N/2), localN, MPI_DOUBLE, 0, MPI_COMM_WORLD);
    
    	if(rank == 0)
    	{
    		mpiend = MPI_Wtime();/// 计时终点 
    		clock_gettime(CLOCK_REALTIME, &now);///
    		double seconds = (double)((now.tv_sec+now.tv_nsec*1e-9) - (double)(tmstart.tv_sec+tmstart.tv_nsec*1e-9));
    
    		printf("PROCESS NUMBER : %d\n\n", N);
    		printf("\n\n\nTIME COST EVALUATED BY C: %e secs\n", seconds);
    		printf("TIME COST EVALUATED BY MPI: %e secs\n\n\n", mpiend-mpist);
    		
    		freopen("output", "w", stdout);
    		for(k = 0; k <= NN-1; k++){
    			//printf("XR[%d]: %.5f  \t XI[%d]: %.5f\n", k, ResultR[k], k, ResultI[k]);	
    			printf("X[%d]: %.5f + %.5f i\n", k, ResultR[k], ResultI[k]);		
    		}
    		
    		
    
    	}
    	MPI_Finalize();
    }
    
    

    高维FFT并行分解

    对于二维的FFT变换,只不过是在两个不同的方向,比如书在在 x x x y y y方向逐一进行并行的一维FFT变换即可,更高维的情况也是一样。这么干是可以的,但是显得也太蠢了。比如说,在做某个方向的FFT时,把值取出来操作,完了放回去,第二次又将其取出来……这样做无疑增加了不少操作。

    所以,在高维情况下做FFT,除了在每个方向都做一次FFT这个蠢办法外,事实上是有一些更高明的方法的。比如基于DSP、FPGA、ARM等的FFT实现等,这里由于篇幅原因,不再赘述,有兴趣的读者可以查看相关文献。

    数值实验

    计算资源与环境

    我用的计算为科学与工程计算国家重点实验室的大规模科学计算四号集群(LSSC-IV)。

    它是首次使用纯IntelPurley平台并采用100Gb EDR Infiniband高速网络的千万亿次计算系统。LSSC-IV集群基于联想深腾8810系统构建,包含超算和大数据计算两部分。计算集群主体部分包含408台新一代ThinkSystem SD530模块化刀片(每个刀片包括2 颗主频为2.3GHz 的Intel Xeon Gold 6140 18核Purley处理器和192GB内存),总共拥有14688 个处理器核,理论峰值性能为1081TFlops,实测LINPACK性能703TFlops。系统还包括1台胖结点(Lenovo X3850X6服务器,2颗Intel Xeon E7-8890 V4处理器,4TB 内存,10TB本地存储),4个KNL结点(1颗Intel Xeon Phi KNL 7250处理器,192GB内存)以及管理结点、登陆结点等。

    集群系统采用Lenovo DS5760存储系统,磁盘阵列配置双控制器,8GB缓存,主机接口8个16Gbps FC接口,60 块6TB NL_SAS盘作为数据存储,裸容量共计360TB,系统持续读写带宽超过4GB/s,磁盘阵列通过2台I/O结点以GPFS并行文件系统管理,共享输出给计算结点。大数据计算部分包括7 台GPU服务器(分别配置NVIDIA Tesla P40、P100 和 V100计算卡)和由8台Lenovo X3650M5服务器组成的HDFS辅助存储系统。集群系统所有结点同时通过千兆以太网和100Gb EDR Infiniband网络连接。其中千兆以太网用于管理,EDR
    Infiniband网络采用星型互联,用于计算通讯。LSSC-IV的操作系统为:Red Hat Enterprise Linux Server 7.3。LSSC-IV 上的编译系统包括Intel C,Fortran编译器,GNU编译器,Intel VTune调试器等。并行环境有MVAPICH2、OpenMPI、Intel MPI,用户可以根据自己的情况选用。LSSC-IV上还安装如下常用数值计算的开源软件:PHG(Parallel Hierarchical Grid),PETSc,Hypre,SLEPc,Trilinos等。用户也可自行在个人目录下安装需要的软件。LSSC-IV采用LSF作业调度系统,两个登陆结点作为任务提交结点。

    LSSC-IV系统2017 年11月成功进入全球HPC TOP500排行榜,位列第382位。

    并行性能的主要考察指标

    加速比:串行执行时间与并行执行时间的比值。

    并行效率:加速比与进程数的比值,即 E P = T S P × T P \text {E}_{\text {P}}=\frac {\text {T}_{\text {S}}}{\text {P} \times \text {T}_{\text {P}}} EP=P×TPTS,其中, P \text {P} P为并行程序执行进程数, T P \text {T}_{\text {P}} TP为并行程序执行时间, T S \text {T}_{\text {S}} TS 为串行程序执行时间。

    可扩展性:并行效率与问题规模、处理机数量之间的关系。

    强可扩展性:保持总体计算规模不变,随着处理器个数的增加,观察并行效率的变化。

    弱可扩展性:保持单个节点的计算规模不变,随着处理器个数的增加,观察并行效率的变化。

    加速比测试

    我们固定核心数为4,改变问题规模,来看看加速比和并行的效率。因为在测量数值比较小时,测量不稳定,我们在程序中计时需要多次计算取平均值。

    在这里插入图片描述

    通过表格,可以观察到看得到,问题规模越大,加速的效果相对就越好。所以说,虽然说问题有其固有的可扩展性限制,可能用到用到没几十个核,加速比就恒定了,但是对于大规模问题来说,因为其加速效率较为可观,其加速效率能够随核心的成倍增长呈现1/2减小,还是很不错的。并行计算很有前景。

    强可扩展性测试

    固定复向量规模为32768(2e15),依次增加核心数为1、2、4、8、16、32、64、128、256,观察其加速比以及并行效率。因为核心数为1的并行和串行还是不尽相同,所以这里核心数从1开始。
    在这里插入图片描述

    在这里插入图片描述

    对比两表,可以看得出,问题规模越大,可扩展性越好,也就是说可增加的核心数随问题的规模变大而增加。问题规模越大,可并行性越强。可以观察到并行效率快速下降,可能由每个核的计算粒度的成倍递减造成的。随着核心数的增加,而问题规模不变,通讯开销慢慢占了主导地位。

    弱可扩展性测试

    在这里插入图片描述

    可以看得出,在问题规模为4096,核心数为8时,获得了最高的并行效率。

    总结

    本文介绍了一维快速傅里叶变换的并行化实现,并为高维的并行化实现提供了思路。
    在并行化方面,基于分解的思想,在并行性能方面,测试了程序的加速比,并行效率以及可扩展性。

    展开全文
  • GPU非常擅长FFT,而Nvidia提供的(闭源)cuFFT库完成了在CUDA设备上实现高效R2C FFT的艰巨任务。 但是我们可以做得更好吗?!? 几乎可以肯定。 无论如何,我将在此仓库中针对我遇到的一个非常特殊的用例进行尝试...
  • clFFT库是离散快速傅立叶变换的开源OpenCL库实现。亲测有效,可供学习。
  • rocFFT是用于计算用HIP编写的快速傅立叶变换(FFT)的软件库。 它是AMD基于的软件生态系统的一部分。 除AMD GPU设备外,该库还可以使用HIP工具与CUDA编译器一起编译,以在Nvidia GPU设备上运行。 安装预构建的...
  • GPU下做fft和Ifft----pycuda

    千次阅读 2017-09-10 12:55:32
    1.调用keikna库 keikna库中有fft,所以为了减小任务量我就调用keikan的fft的库来完成。...2.介绍FFT和IFFT的实现 对于二维的傅里叶变换的实现,冈萨雷斯《数字图像处理》有详细的介绍。 通过对这两个
  • 因为在网上找资料,当时想学习一下多个 1 维信号的 fft,这里我推荐这位博主的文章,但是我没有成功,我后来自己实现了。 1. 下载 想使用cuFFT库,必须下载,可以从CUDA官网下载软件包,也可以通过我提供的我的...
  • 基于 NXP iMX8X 测试 GPU FFT 运算

    千次阅读 2019-03-19 11:21:27
    接下来你将会了解如何使用 OpenCL 在 iMX8X 上的 GPU 实现 FFT 计算。     2).  搭建开发环境 在撰写本文时,Toradex 基于 imx-4.9.123 Linux BSP 提供 Colibri iMX8X 的支持。由于是...
  • 实现傅立叶变换FFT,c++编写,可以直接运行。
  • 分别使用FFT和矩阵乘法实现线性卷积,并在CPU和GPU两种情况下比较运行时间。
  • 通过两种算法实现GPU加速的非均匀快速傅立叶变换和场校正。 还支持通过MPI的pcSENSE进行分布式内存计算,以校正运动引起的相位误差以及低阶重建。 贫民窟 -模板化线性代数库。 在C ++中为我们提供了类似于MATLAB的...
  • 它使用了Andrew Holme提供的GPU FFT库(请参阅 )。 我比较了使用cython和ctypes的实现,并且ctypes解决方案比较慢(由于C调用的输入数据转换,因此开销很大)。 就我而言,它比使用纯python和Numpy快7倍。 依存...
  • 此函数在 GPU实现矢量化 FFT。 答案与 fft 和 ifft matlab 函数相同。 您需要安装 Naga K. Govindaraju http://gamma.cs.unc.edu/GPUFFTW/的 GPUFFTW2.0 库。 在 Naga 的网站上,您可以看到系统要求。 它适用于 ...
  • 基于2、3维类型1、2非均匀FFTGPU实现。 这是由Melody Shih在Flatiron Institute实习的作品,由CCM项目负责人Alex Barnett建议。 安装 获取此代码和依赖项git clone ...
  • GPU实现了Richardson-Lucy算法。 安装 该软件已在64位Linux和Windows 7上进行了测试。 二进制文件可从下载。 如果要自己编译软件: 从nVidia安装CUDA SDK(> = 5.5) 在Linux上:make -f Makefile.linux 在Windows...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 1,687
精华内容 674
关键字:

fft的gpu实现