mardi 4 août 2015

OpenMP Single Producer Multiple Consumer

I am trying to achieve something contrived using OpenMP.

I have a multi-core system with N available processors. I want to have a vector of objects of length k*P to be populated in batches of P by a single thread (by reading a file), i.e. a single thread reads this file and writes in vecObj[0 to P-1] then vecObj[p to 2P-1] etc. To make things simple, this vector is pre-resized (i.e. inserting using = operator, no pushbacks, constant length as far as we are concerned).

After a batch is written into the vector, I want the remaining N-1 threads to work on the available data. Since every object can take different time to be worked upon, it would be good to have dynamic scheduling for the remaining threads. The below snippet works really well when all the threads are working on the data.

#pragma omp parallel for schedule(dynamic, per_thread)
    for(size_t i = 0; i < dataLength(); ++i) {
        threadWorkOnElement(vecObj, i);
    }

Now, according to me, the the main issue I am facing in thinking up of a solution is the question as to how can I have N-1 threads dynamically scheduled over the range of available data, while another thread just keeps on reading and populating the vector with data?

I am guessing that the issue of writing new data and messaging the remaining threads can be achieved using std atomic.

I think that what I am trying to achieve is along the lines of the following pseudo code

std::atomic<size_t> freshDataEnd;
size_t dataWorkStart = 0;
size_t dataWorkEnd;
#pragma omp parallel
{
    #pragma omp task
    {
        //increment freshDataEnd atomically upon reading every P objects
        //return when end of file is reached
        readData(vecObj, freshDataEnd);
    }
    #pragma omp task
    {
        omp_set_num_threads(N-1);           
        while(freshDataEnd <= MAX_VEC_LEN) {
            if (dataWorkStart < freshDataEnd) {
                dataWorkEnd = freshDataEnd;
                #pragma omp parallel for schedule(dynamic, per_thread)
                for(size_t i = dataWorkStart; i < dataWorkEnd; ++i) {
                    threadWorkOnElement(vecObj, i);
                }
                dataWorkStart = dataWorkEnd;
            }
        }
    }
}

Is this the correct approach to achieve what I am trying to do? How can I handle this sort of nested parallelism? Not so important : I would have preferred to stick with openmp directives and not use std atomics, is that possible? How?



via Chebli Mohamed

Aucun commentaire:

Enregistrer un commentaire