Another try with getting parallel processes to work. Please excuse the amount of code but every attempt to shorten it makes the error vanish.
What I tested so far:
- sending int from parent to child, from child to parent, and from parent to child and then back: works
- processing a list of int: send from parent to child, modify and back to parent: works
- more data: int + string, from parent to child, modify and back to parent: works
- a list of data the same way: works
But when I run the same function that works a second time it always fail.
This is the function that creates the child process:
//parent sends binary data from list to child which sends back modified data
bool processParallel6(std::vector<std::pair<int, std::string>> & data)
{
//define pipe
int parent2Child[2];
int child2Parent[2];
//create pipe
pipe(parent2Child);
pipe(child2Parent);
//fork
pid_t child = fork();
if(child == 0) //child process
{
//close not needed end of pipe
close(parent2Child[1]);
close(child2Parent[0]);
for(;;)
{
struct pollfd pfd;
pfd.fd = parent2Child[0];
pfd.events = POLLIN;
//wait until data is available at the pipe
cout << "c: poll ..." << endl;
if(poll(&pfd, 1, -1) < 0)
{
cout << "c: poll: " << strerror(errno) << endl;
exit(-1);
}
cout << "c: poll says there are data" << endl;
if((pfd.revents&POLLIN) == POLLIN)
{
int data;
std::string text;
if(!readData3(parent2Child[0], data, text))
exit(-2);
cout << "c: data received: " << data << " " << text << endl;
if(data == -1)
break;
if(!writeData3(child2Parent[1], data * 2, text + text))
exit(-3);
cout << "c: sent data to parent: " << 2 * data << " " << text + text << endl;
}
}
close(parent2Child[0]);
close(child2Parent[1]);
exit(0);
}
else //parent process
{
//close not needed end of pipe
close(parent2Child[0]);
close(child2Parent[1]);
//send data to child
if(!writeData3(parent2Child[1], data.back().first, data.back().second))
return false;
cout << "p: wrote data: " << data.back().first << " " << data.back().second << endl;
data.pop_back();
//read result from child
for(;;)
{
struct pollfd pfd;
pfd.fd = child2Parent[0];
pfd.events = POLLIN;
//wait until data is available at the pipe
cout << "p: poll ..." << endl;
if(poll(&pfd, 1, -1) < 0)
{
cout << "p poll: " << strerror(errno) << endl;
return false;
}
cout << "p: poll says there are data" << endl;
if((pfd.revents&POLLIN) == POLLIN)
{
int data;
std::string text;
if(!readData3(child2Parent[0], data, text))
return false;
cout << "p: data received: " << data << " " << text << endl;
}
if(data.empty())
break;
if(!writeData3(parent2Child[1], data.back().first, data.back().second))
return false;
cout << "p: wrote data: " << data.back().first << " " << data.back().second << endl;
data.pop_back();
}
//send stop data
if(!writeData3(parent2Child[1], -1, "notext"))
return false;
cout << "p: sent stop data " << endl;
//wait for child to end
wait(nullptr);
//close all pipes
close(parent2Child[1]);
close(child2Parent[0]);
}
return true;
}
For reading and writing data I use this two functions:
bool readData3(int fd, int & number, std::string & text)
{
char numberBuf[sizeof(int)];
int bytesRead = read(fd, numberBuf, sizeof(int));
if(bytesRead > 0)
{
number = *(int *)numberBuf;
}
else if(bytesRead < 0)
{
cout << "readData3: " << strerror(errno) << endl;
return false;
}
char sizeBuf[sizeof(int)];
int size = -1;
bytesRead = read(fd, sizeBuf, sizeof(int));
if(bytesRead > 0)
{
size = *(int *)sizeBuf;
}
else if(bytesRead < 0)
{
cout << "readData3: " << strerror(errno) << endl;
return false;
}
char textBuf[size];
bytesRead = read(fd, textBuf, size);
if(bytesRead > 0)
{
text = std::string(textBuf);
}
else if(bytesRead < 0)
{
cout << "readData3: " << strerror(errno) << endl;
return false;
}
return true;
}
bool writeData3(int fd, const int number, const std::string text)
{
int bytesWritten = write(fd, &number, sizeof(int));
if(bytesWritten < 0)
{
cout << "writeData3: " << strerror(errno) << endl;
return false;
}
int size = text.size() + 1;
bytesWritten = write(fd, &size, sizeof(int));
if(bytesWritten < 0)
{
cout << "writeData3: " << strerror(errno) << endl;
return false;
}
bytesWritten = write(fd, text.c_str(), size);
if(bytesWritten < 0)
{
cout << "writeData3: " << strerror(errno) << endl;
return false;
}
return true;
}
Finally I run it like this:
#include <iostream>
#include <vector>
#include <unistd.h>
#include <sys/wait.h>
#include <fcntl.h>
#include <bitset>
#include <memory>
#include <poll.h>
#include <cstring>
using namespace std;
int main(int /*argc*/, char* /*argv*/[])
{
std::vector<std::pair<int, std::string>> data;
data.push_back(std::make_pair(1, "one"));
data.push_back(std::make_pair(2, "two"));
cout << "6a ########################################################" << endl << flush;
processParallel6(data);
cout << "6b ########################################################" << endl << flush;
processParallel6(data);
return 0;
}
This is the output:
6a ###############################################
p: wrote data: 2 two
p: poll ...
c: poll ...
c: poll says there are data
c: data received: 2 two
p: poll says there are data
p: data received: 4 twotwo
p: wrote data: 1 one
p: poll ...
c: sent data to parent: 4 twotwo
c: poll ...
c: poll says there are data
c: data received: 1 one
p: poll says there are data
p: data received: 2 oneone
p: sent stop data
c: sent data to parent: 2 oneone
c: poll ...
c: poll says there are data
c: data received: -1 notext
6b ###################################################
c: poll ...
terminate called after throwing an instance of 'std::logic_error'
what(): basic_string::_M_construct null not valid
c: poll says there are data
c: poll ...
c: poll says there are data
c: poll ...
The last 4 lines are repeated a thousands of times. This output comes most of the times, but sometimes I have seen a std::bad_alloc error. When I try strace it crashes too, but when it runs I have seen directly after the second run of processParallel6() a line with mmap, ENOEM and 'Cannot allocate memory'
What happens here? Why is it working the first time, but not the second time?