/* t-poll.c - Check the poll function
* Copyright (C) 2015 g10 Code GmbH
*
* This file is part of libgpg-error.
*
* libgpg-error is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public License
* as published by the Free Software Foundation; either version 2.1 of
* the License, or (at your option) any later version.
*
* libgpg-error is distributed in the hope that it will be useful, but
* WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this program; if not, see .
*/
/* FIXME: We need much better tests that this very basic one. */
#if HAVE_CONFIG_H
# include
#endif
#include
#include
#include
#include
#include
#include
#include
#ifdef _WIN32
# include
# include
#else
# ifdef USE_POSIX_THREADS
# include
# endif
#endif
#define PGM "t-poll"
#include "t-common.h"
#ifdef _WIN32
# define THREAD_RET_TYPE DWORD WINAPI
# define THREAD_RET_VALUE 0
#else
# define THREAD_RET_TYPE void *
# define THREAD_RET_VALUE NULL
#endif
/* Object to convey data to a thread. */
struct thread_arg
{
const char *name;
estream_t stream;
volatile int stop_me;
#ifdef USE_POSIX_THREADS
pthread_t thread;
#elif _WIN32
HANDLE thread;
#endif
};
static struct thread_arg peer_stdin; /* Thread to feed the stdin. */
static struct thread_arg peer_stdout; /* Thread to feed the stdout. */
static struct thread_arg peer_stderr; /* Thread to feed the stderr. */
static estream_t test_stdin;
static estream_t test_stdout;
static estream_t test_stderr;
#if defined(_WIN32) || defined(USE_POSIX_THREADS)
/* This thread feeds data to the given stream. */
static THREAD_RET_TYPE
producer_thread (void *argaddr)
{
struct thread_arg *arg = argaddr;
int i = 0;
(void)arg;
while (!arg->stop_me && i++ < 3)
{
show ("thread '%s' about to write\n", arg->name);
es_fprintf (arg->stream, "This is '%s' count=%d\n", arg->name, i);
es_fflush (arg->stream);
}
es_fclose (arg->stream);
return THREAD_RET_VALUE;
}
/* This thread eats data from the given stream. */
static THREAD_RET_TYPE
consumer_thread (void *argaddr)
{
struct thread_arg *arg = argaddr;
char buf[15];
(void)arg;
while (!arg->stop_me)
{
show ("thread '%s' ready to read\n", arg->name);
if (!es_fgets (buf, sizeof buf, arg->stream))
{
show ("Thread '%s' received EOF or error\n", arg->name);
break;
}
show ("Thread '%s' got: '%s'\n", arg->name, buf);
}
es_fclose (arg->stream);
return THREAD_RET_VALUE;
}
#endif /*_WIN32 || USE_POSIX_THREADS */
static void
launch_thread (THREAD_RET_TYPE (*fnc)(void *), struct thread_arg *th)
{
int fd;
th->stop_me = 0;
fd = es_fileno (th->stream);
#ifdef _WIN32
th->thread = CreateThread (NULL, 0, fnc, th, 0, NULL);
if (!th->thread)
die ("creating thread '%s' failed: rc=%d", th->name, (int)GetLastError ());
show ("thread '%s' launched (fd=%d)\n", th->name, fd);
#elif USE_POSIX_THREADS
if (pthread_create (&th->thread, NULL, fnc, th))
die ("creating thread '%s' failed: %s\n", th->name, strerror (errno));
show ("thread '%s' launched (fd=%d)\n", th->name, fd);
# else /* no thread support */
verbose++;
show ("no thread support - skipping test\n", PGM);
verbose--;
#endif /* no thread support */
}
static void
join_thread (struct thread_arg *th)
{
#ifdef _WIN32
int rc;
rc = WaitForSingleObject (th->thread, INFINITE);
if (rc == WAIT_OBJECT_0)
show ("thread '%s' has terminated\n", th->name);
else
fail ("waiting for thread '%s' failed: %d", th->name, (int)GetLastError ());
CloseHandle (th->thread);
#elif USE_POSIX_THREADS
pthread_join (th->thread, NULL);
show ("thread '%s' has terminated\n", th->name);
#endif
}
static void
create_pipe (estream_t *r_in, estream_t *r_out)
{
gpg_error_t err;
int filedes[2];
#ifdef _WIN32
if (_pipe (filedes, 512, 0) == -1)
#else
if (pipe (filedes) == -1)
#endif
{
err = gpg_error_from_syserror ();
die ("error creating a pipe: %s\n", gpg_strerror (err));
}
show ("created pipe [%d, %d]\n", filedes[0], filedes[1]);
*r_in = es_fdopen (filedes[0], "r,pollable");
if (!*r_in)
{
err = gpg_error_from_syserror ();
die ("error creating a stream for a pipe: %s\n", gpg_strerror (err));
}
*r_out = es_fdopen (filedes[1], "w,pollable");
if (!*r_out)
{
err = gpg_error_from_syserror ();
die ("error creating a stream for a pipe: %s\n", gpg_strerror (err));
}
}
static void
test_poll (void)
{
int ret;
gpgrt_poll_t fds[3];
char buffer[16];
size_t used, nwritten;
int c;
memset (fds, 0, sizeof fds);
fds[0].stream = test_stdin;
fds[0].want_read = 1;
fds[1].stream = test_stdout;
fds[1].want_write = 1;
/* FIXME: We don't use the next stream at all. */
fds[2].stream = test_stderr;
fds[2].want_write = 1;
fds[2].ignore = 1;
used = 0;
while (used || !fds[0].ignore)
{
ret = gpgrt_poll (fds, DIM(fds), -1);
if (ret == -1)
{
fail ("gpgrt_poll failed: %s\n", strerror (errno));
continue;
}
if (!ret)
{
fail ("gpgrt_poll unexpectedly timed out\n");
continue;
}
show ("gpgrt_poll detected %d events\n", ret);
if (debug)
show ("gpgrt_poll: r=%d"
" 0:%c%c%c%c%c%c%c%c%c%c%c%c"
" 1:%c%c%c%c%c%c%c%c%c%c%c%c"
" 2:%c%c%c%c%c%c%c%c%c%c%c%c"
"\n",
ret,
fds[0].want_read? 'r':'-',
fds[0].want_write? 'w':'-',
fds[0].want_oob? 'o':'-',
fds[0].want_rdhup? 'h':'-',
fds[0].ignore? '!':'=',
fds[0].got_read? 'r':'-',
fds[0].got_write? 'w':'-',
fds[0].got_oob? 'o':'-',
fds[0].got_rdhup? 'h':'-',
fds[0].got_hup? 'H':' ',
fds[0].got_err? 'e':' ',
fds[0].got_nval? 'n':' ',
fds[1].want_read? 'r':'-',
fds[1].want_write? 'w':'-',
fds[1].want_oob? 'o':'-',
fds[1].want_rdhup? 'h':'-',
fds[1].ignore? '!':'=',
fds[1].got_read? 'r':'-',
fds[1].got_write? 'w':'-',
fds[1].got_oob? 'o':'-',
fds[1].got_rdhup? 'h':'-',
fds[1].got_hup? 'H':' ',
fds[1].got_err? 'e':' ',
fds[1].got_nval? 'n':' ',
fds[2].want_read? 'r':'-',
fds[2].want_write? 'w':'-',
fds[2].want_oob? 'o':'-',
fds[2].want_rdhup? 'h':'-',
fds[2].ignore? '!':'=',
fds[2].got_read? 'r':'-',
fds[2].got_write? 'w':'-',
fds[2].got_oob? 'o':'-',
fds[2].got_rdhup? 'h':'-',
fds[2].got_hup? 'H':' ',
fds[2].got_err? 'e':' ',
fds[2].got_nval? 'n':' '
);
else
show ("gpgrt_poll detected %d events\n", ret);
if (fds[0].got_read)
{
/* Read from the producer. */
for (;;)
{
c = es_fgetc (fds[0].stream);
if (c == EOF)
{
if (es_feof (fds[0].stream))
{
show ("reading '%s': EOF\n", peer_stdin.name);
fds[0].ignore = 1; /* Not anymore needed. */
peer_stdin.stop_me = 1; /* Tell the thread to stop. */
}
else if (es_ferror (fds[0].stream))
{
fail ("error reading '%s': %s\n",
peer_stdin.name, strerror (errno));
fds[0].ignore = 1; /* Disable. */
peer_stdin.stop_me = 1; /* Tell the thread to stop. */
}
else
show ("reading '%s': EAGAIN\n", peer_stdin.name);
break;
}
else
{
if (used <= sizeof buffer -1)
buffer[used++] = c;
if (used == sizeof buffer)
{
show ("throttling reading from '%s'\n", peer_stdin.name);
fds[0].ignore = 1;
break;
}
}
}
show ("read from '%s': %zu bytes\n", peer_stdin.name, used);
if (used)
fds[1].ignore = 0; /* Data to send. */
}
if (fds[1].got_write)
{
if (used)
{
ret = es_write (fds[1].stream, buffer, used, &nwritten);
show ("result for writing to '%s': ret=%d, n=%zu, nwritten=%zu\n",
peer_stdout.name, ret, used, nwritten);
if (!ret)
{
assert (nwritten <= used);
/* Move the remaining data to the front of buffer. */
memmove (buffer, buffer + nwritten,
sizeof buffer - nwritten);
used -= nwritten;
}
ret = es_fflush (fds[1].stream);
if (ret)
fail ("Flushing for '%s' failed: %s\n",
peer_stdout.name, strerror (errno));
}
if (!used)
fds[1].ignore = 1; /* No need to send data. */
}
if (used < sizeof buffer / 2 && !peer_stdin.stop_me && fds[0].ignore)
{
show ("accelerate reading from '%s'\n", peer_stdin.name);
fds[0].ignore = 0;
}
}
}
int
main (int argc, char **argv)
{
int last_argc = -1;
if (argc)
{
argc--; argv++;
}
while (argc && last_argc != argc )
{
last_argc = argc;
if (!strcmp (*argv, "--help"))
{
puts (
"usage: ./t-poll [options]\n"
"\n"
"Options:\n"
" --verbose Show what is going on\n"
" --debug Flyswatter\n"
);
exit (0);
}
if (!strcmp (*argv, "--verbose"))
{
verbose = 1;
argc--; argv++;
}
else if (!strcmp (*argv, "--debug"))
{
verbose = debug = 1;
argc--; argv++;
}
}
if (!gpg_error_check_version (GPG_ERROR_VERSION))
{
die ("gpg_error_check_version returned an error");
errorcount++;
}
peer_stdin.name = "stdin producer";
create_pipe (&test_stdin, &peer_stdin.stream);
peer_stdout.name = "stdout consumer";
create_pipe (&peer_stdout.stream, &test_stdout);
peer_stderr.name = "stderr consumer";
create_pipe (&peer_stderr.stream, &test_stderr);
if (es_set_nonblock (test_stdin, 1))
fail ("error setting test_stdin to nonblock: %s\n", strerror (errno));
if (es_set_nonblock (test_stdout, 1))
fail ("error setting test_stdout to nonblock: %s\n", strerror (errno));
if (es_set_nonblock (test_stderr, 1))
fail ("error setting test_stderr to nonblock: %s\n", strerror (errno));
launch_thread (producer_thread, &peer_stdin );
launch_thread (consumer_thread, &peer_stdout);
launch_thread (consumer_thread, &peer_stderr);
test_poll ();
show ("Waiting for threads to terminate...\n");
es_fclose (test_stdin);
es_fclose (test_stdout);
es_fclose (test_stderr);
peer_stdin.stop_me = 1;
peer_stdout.stop_me = 1;
peer_stderr.stop_me = 1;
join_thread (&peer_stdin);
join_thread (&peer_stdout);
join_thread (&peer_stderr);
return errorcount ? 1 : 0;
}