Programmazione Funzionale e Parallela

Corso di Laurea in Ingegneria Informatica e Automatica - A.A. 2017-2018

HomePage | Avvisi | Diario lezioni | Materiale didattico | Esami | Forum | Login

Giovedì 23 novembre 2017


Esempio 1

Somma di array di int di dimensione arbitraria usando vettorizzazione SSE:

vecsum.c
#include <immintrin.h>
#include <stdio.h>

void sum_sse(int A[4], int B[4], int C[4]);

void vecsum(int* A, int* B, int* C, int n) {
    int i;
    for (i=0; i+3<n; i+=4) {
        #if VERSION == 1
        sum_sse(A+i,B+i,C+i);   // versione SSE
        #else
        C[i]=A[i]+B[i];         // versione sequenziale, con loop unrolling
        C[i+1]=A[i+1]+B[i+1];
        C[i+2]=A[i+2]+B[i+2];
        C[i+3]=A[i+3]+B[i+3];
        #endif
    }
    for (;i<n;++i) C[i]=A[i]+B[i];
}

void  sum_sse(int A[4], int B[4], int C[4]) {
    __m128i a, b, c;                        // dich. variabili vettoriali
    a = _mm_loadu_si128((const __m128i*)A); // copia primo vettore in a
    b = _mm_loadu_si128((const __m128i*)B); // copia secondo vettore in b
    c = _mm_add_epi32(a,b);                 // calcola la somma vett. di a e b
    _mm_store_si128((__m128i*)C, c);        // copia risultato c in C
}

void init(int* v, int n) {
    int i;
    for (i=0; i<n; ++i) v[i] = i % 97;
}

#ifndef N
#define N 1000
#endif

#ifndef M
#define M 100000
#endif

// provare anche con M=1 e N=100000000:
// array A, B, C da 100 milioni di int => 1.2 GB per 3 array di int
// (collo di bottiglia memoria vanifica parallelismo)

int main() {
    int j;

    int* A = malloc(N*sizeof(int));
    int* B = malloc(N*sizeof(int));
    int* C = malloc(N*sizeof(int));

    // ripete per rendere misurabili i tempi
    for (j=0; j<M;j++) vecsum(A,B,C,N);

    printf("%d %d %d %d %d\n", C[0], C[1], C[2], C[3], C[4]);
    return 0;
}


Compilazione versioni con piccola memory footprint (array di 1000 elementi):
> gcc vecsum.c -D VERSION=1 -o vecsum-sse -O1
> gcc vecsum.c -D VERSION=0 -o vecsum -O1
> time ./vecsum
0 0 0 0 0

real	0m0.047s
user	0m0.046s
sys	0m0.000s
> time ./vecsum-sse
0 0 0 0 0

real	0m0.016s
user	0m0.013s
sys	0m0.000s


Compilazione versioni con alta memory footprint (array di 100000000 elementi):
> gcc vecsum.c -D VERSION=1 -D N=100000000 -D M=1 -o vecsum-large-sse -O1
> gcc vecsum.c -D VERSION=0 -D N=100000000 -D M=1 -o vecsum-large -O1
> time ./vecsum-large
0 0 0 0 0

real	0m0.095s
user	0m0.074s
sys	0m0.021s
> time ./vecsum-large-sse 
0 0 0 0 0

real	0m0.073s
user	0m0.031s
sys	0m0.040s


Si noti come in questo secondo caso lo speedup sia nettamente inferiore. La motivazione è che gli array sono grandi e il collo di bottiglia è il sistema di memoria e non il calcolo delle operazioni aritmetiche.

Esempio 2

Somma degli elementi di un array di int di dimensione arbitraria usando vettorizzazione SSE:

sumall.c
#include <immintrin.h>
#include <stdio.h>

void somma_sse(int A[4], int B[4], int C[4]);

int sumall(int* A, int n) {
    int i, s = 0, S[4] = { 0 };
    for (i=0; i+3<n; i+=4)
        somma_sse(A+i, S, S);
    for (; i<n; ++i) s += A[i];
    return S[0] + S[1] + S[2] + S[3] + s;
}

void somma_sse(int A[4], int B[4], int C[4]) {
    __m128i a, b, c;                        // dich. variabili vettoriali
    a = _mm_loadu_si128((const __m128i*)A); // copia primo vettore in a
    b = _mm_loadu_si128((const __m128i*)B); // copia secondo vettore in b
    c = _mm_add_epi32(a,b);                 // calcola la somma vett. di a e b
    _mm_store_si128((__m128i*)C, c);        // copia risultato c in C
}

int main() {
    int A[] = { 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1 };
    int s = sumall(A,sizeof(A)/sizeof(int));
    printf("%d\n", s);
    return 0;
}


Esempio 3

Prodotto scalare di array di int di dimensione arbitraria usando vettorizzazione SSE:

scalprod.c
#include <immintrin.h>
#include <stdio.h>

void somma_sse(int A[4], int B[4], int C[4]);
void prod_sse(int A[4], int B[4], int C[4]);

int scalprod(int* A, int* B, int n) {
    int i, s = 0, S[4] = { 0 }, P[4];
    for (i=0; i+3<n; i+=4) {
        prod_sse(A+i,B+i,P);
        somma_sse(P, S, S);
    }
    for (; i<n; ++i) s += A[i]*B[i];
    return S[0] + S[1] + S[2] + S[3] + s;
}

void somma_sse(int A[4], int B[4], int C[4]) {
    __m128i a, b, c;                         // dich. variabili vettoriali
    a = _mm_loadu_si128((const __m128i*)A);  // copia primo vettore in a
    b = _mm_loadu_si128((const __m128i*)B);  // copia secondo vettore in b
    c = _mm_add_epi32(a,b);                  // calcola la somma vett. di a e b
    _mm_store_si128((__m128i*)C, c);         // copia risultato c in C
}

void prod_sse(int A[4], int B[4], int C[4]) {
    __m128i a, b, c;                         // dich. variabili vettoriali
    a = _mm_loadu_si128((const __m128i*)A);  // copia primo vettore in a
    b = _mm_loadu_si128((const __m128i*)B);  // copia secondo vettore in b
    c = _mm_mullo_epi32(a,b);                // calcola la somma vett. di a e b
    _mm_store_si128((__m128i*)C, c);         // copia risultato c in C
}

int main() {
    int A[] = { 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1 };
    int B[] = { 2, 1, 2, 1, 1, 1, 1, 1, 1, 1, 1 };
    int s = scalprod(A, B, sizeof(A)/sizeof(int));
    printf("%d\n", s);
    return 0;
}