[173] in 6.033-lab

home help back first fref pref prev next nref lref last post

solutions to timer assignment

daemon@ATHENA.MIT.EDU (Benjie Chen)
Tue Feb 22 17:38:03 2000

From: Benjie Chen <benjie@cag.lcs.mit.edu>
Message-Id: <200002222237.RAA18149@amsterdam.lcs.mit.edu>
To: 6.033-lab@MIT.EDU
Date: Tue, 22 Feb 2000 17:37:48 -0500 (EST)
Mime-Version: 1.0
Content-Type: text/plain; charset=us-ascii
Content-Transfer-Encoding: 7bit


Feel free to send in bug reports or use the code in your proxy assignment.

--------------------------- async.c ------------------------------

#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <stdarg.h>
#include <fcntl.h>
#include <string.h>
#include <errno.h>
#include <assert.h>
#include <sys/socket.h>

#include <sys/time.h>
#include <sys/types.h>

#include "async.h"

/* callback to make when a file descriptor is ready */
struct io_cb {
  void (*cb_fn) (void *);     /* Function to call */
  void *cb_arg;               /* Argument to pass function */
};
static struct io_cb rcb[FD_MAX], wcb[FD_MAX];  /* Per fd callbacks */
static fd_set rfds, wfds;                      /* Bitmap of cb's in use */

/* timer callbacks */
struct timer_cb {
  void (*cb_fn) (void *);
  void *cb_arg;
  unsigned int left;
  unsigned int id;
  struct timer_cb *next;
  struct timer_cb *prev;
};
static struct timer_cb *tcbs[FD_MAX];
static struct timer_cb *timers = 0;

/* some timer helper functions */
static void
remove_timer_from_list(struct timer_cb* t)
{
  if (timers == t) 
    timers = t->next;
  if (t->prev) 
    t->prev->next = t->next; 
  if (t->next) 
    t->next->prev = t->prev;
  t->next = t->prev = NULL;
}

static void
update_timer_list(unsigned long elapsed)
{
  struct timer_cb* p = timers;
  while(p) {
    p->left = (p->left>elapsed) ? (p->left-elapsed) : 0;
    p = p->next;
  }
}

static void
add_timer_into_list(struct timer_cb* t)
{
  if (!timers || timers->left > t->left) { /* first item */
    t->next = timers;
    if (timers) timers->prev = t;
    t->prev = NULL;
    timers = t;
    return;
  } 
  else { 
    struct timer_cb* p = timers->next;
    struct timer_cb* pp = timers;

    while(p) { /* middle of the list */
      if (p->left > t->left) {
        pp->next = t;
        t->next = p;
        t->prev = pp;
        p->prev = t;
        return;
      }
      pp = p;
      p = p->next;
    }
    /* end of list */
    pp->next = t;
    t->next = NULL;
    t->prev = pp;
  }
}

int
cb_timer_add (unsigned int ms, void (*fn)(void*), void *arg)
{
  int i;
  for(i=0; i<FD_MAX; i++) {
    if (!tcbs[i]) break;
  }
  if (i == FD_MAX) return -1;

  tcbs[i] = (struct timer_cb*) xmalloc(sizeof(struct timer_cb));
  if (tcbs[i]) {
    tcbs[i]->cb_fn  = fn;
    tcbs[i]->cb_arg = arg;
    tcbs[i]->left   = ms;
    tcbs[i]->id = i;
    tcbs[i]->next = tcbs[i]->prev = 0;
    add_timer_into_list(tcbs[i]);
    return i;
  }
  else 
    return -1;
}

int
cb_timer_free(int timer_id)
{
  if (timer_id < 0 || timer_id >= FD_MAX || !tcbs[timer_id])
    return -1;
  else {
    remove_timer_from_list(tcbs[timer_id]);
    xfree(tcbs[timer_id]);
    tcbs[timer_id] = 0L;
    return 0;
  }
}

void 
cb_io_free (int fd, int write)
{
  assert (fd >= 0 && fd < FD_MAX);
  FD_CLR (fd, write ? &wfds : &rfds);
}

void
cb_io_add (int fd, int write, void (*fn)(void *), void *arg)
{
  struct io_cb *c;

  assert (fd >= 0 && fd < FD_MAX);
  c = &(write ? wcb : rcb)[fd];
  c->cb_fn = fn;
  c->cb_arg = arg;
  FD_SET (fd, write ? &wfds : &rfds);
}

static inline unsigned int
timeval_diff_ms(struct timeval t1, struct timeval t2)
{
  unsigned int elapsed = 
    (t2.tv_sec  - t1.tv_sec)*1000000 + (t2.tv_usec - t1.tv_usec); 
  return (elapsed / 1000);
}

void
cb_check (void)
{
  static struct timeval after_select={0,0};

  fd_set trfds, twfds;
  struct timeval expire, before_select;
  struct timeval *dummy=NULL;
  unsigned int elapsed=0;
  int i, n=0, run_select=1;

  if (timers) { /* there are stuff on timers, so set an expire time */
    gettimeofday(&before_select, NULL);
    if (after_select.tv_sec != 0)
      elapsed = timeval_diff_ms(after_select, before_select);

    if (timers->left > elapsed) {
      expire.tv_sec = 0;
      expire.tv_usec = timers->left - elapsed;
      dummy = &expire;
    }
    else run_select = 0;
  }

  if (run_select) {
    /* call select. since the fd_sets are both input and output arguments, we
     * must copy rfds and wfds. */
    trfds = rfds;
    twfds = wfds;
    n = select (FD_MAX, &trfds, &twfds, NULL, dummy);
    if (n < 0)
      fatal ("select: %s\n", strerror (errno));
  }

  /* update timer list */
  if (timers) {
    gettimeofday(&after_select, NULL);
    elapsed += timeval_diff_ms(before_select, after_select);
    update_timer_list(elapsed);
  }

  /* loop through and make callbacks for all ready file descriptors */
  for (i = 0; n && i < FD_MAX; i++) {
    if (FD_ISSET (i, &trfds)) {
      n--;
      /* Because any one of the callbacks we make might in turn call
       * cb_free on a higher numbered file descriptor, we want to make
       * sure each callback is wanted before we make it.  Hence check
       * rfds. */
      if (FD_ISSET (i, &rfds))
        rcb[i].cb_fn (rcb[i].cb_arg);
    }
    if (FD_ISSET (i, &twfds)) {
      n--;
      if (FD_ISSET (i, &wfds))
        wcb[i].cb_fn (wcb[i].cb_arg);
    }
  }
  
  /* check for expired timers - must use array instead of linked list because
   * callback functions can remove timers. */
  if (timers && timers->left == 0) { 
    for (i = 0; i < FD_MAX; i++) { 
      struct timer_cb *timer = tcbs[i]; 
      if (timer && timer->left == 0) 
	timer->cb_fn (timer->cb_arg);
    }
  }

  if (!timers) { 
    /* nothing on timers, clear after_select */
    after_select.tv_sec = 0;
    after_select.tv_usec = 0;
  }
}

void
make_async (int s)
{
  int n;

  /* Make file file descriptor nonblocking. */
  if ((n = fcntl (s, F_GETFL)) < 0
      || fcntl (s, F_SETFL, n | O_NONBLOCK) < 0)
    fatal ("O_NONBLOCK: %s\n", strerror (errno));

  /* You can pretty much ignore the rest of this function... */

  /* Many asynchronous programming errors occur only when slow peers
   * trigger short writes.  To simulate this during testing, we set
   * the buffer size on the socket to 4 bytes.  This will ensure that
   * each read and write operation works on at most 4 bytes--a good
   * stress test. */
#if SMALL_LIMITS
#if defined (SO_RCVBUF) && defined (SO_SNDBUF)
  /* Make sure this really is a stream socket (like TCP).  Code using
   * datagram sockets will simply fail miserably if it can never
   * transmit a packet larger than 4 bytes. */
  {
    int sn = sizeof (n);
    if (getsockopt (s, SOL_SOCKET, SO_TYPE, (char *)&n, &sn) < 0
        || n != SOCK_STREAM)
      return;
  }

  n = 4;
  if (setsockopt (s, SOL_SOCKET, SO_RCVBUF, (void *)&n, sizeof (n)) < 0)
    return;
  if (setsockopt (s, SOL_SOCKET, SO_SNDBUF, (void *)&n, sizeof (n)) < 0)
    fatal ("SO_SNDBUF: %s\n", strerror (errno));
#else /* !SO_RCVBUF || !SO_SNDBUF */
#error "Need SO_RCVBUF/SO_SNDBUF for SMALL_LIMITS"
#endif /* SO_RCVBUF && SO_SNDBUF */
#endif /* SMALL_LIMITS */
  
  /* Enable keepalives to make sockets time out if servers go away. */
  n = 1;
  if (setsockopt (s, SOL_SOCKET, SO_KEEPALIVE, (void *) &n, sizeof (n)) < 0)
    fatal ("SO_KEEPALIVE: %s\n", strerror (errno));
}

void *
xrealloc (void *p, size_t size)
{
  p = realloc (p, size);
  if (size && !p)
    fatal ("out of memory\n");
  return p;
}

void
fatal (const char *msg, ...)
{
  va_list ap;

  fprintf (stderr, "fatal: ");
  va_start (ap, msg);
  vfprintf (stderr, msg, ap);
  va_end (ap);
  exit (1);
}

------------------------------ multifinger.c --------------------------

#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <string.h>
#include <errno.h>
#include <netdb.h>
#include <signal.h>
#include <netinet/in.h>
#include <sys/types.h>
#include <sys/socket.h>

#include "../async/async.h"

#define FINGER_PORT 79
// #define FINGER_PORT 3000
#define MAX_RESP_SIZE 16384
#define TIMEOUT	    10*1000

struct fcon {
  int fd;
  char *host;          /* Host to which we are connecting */
  char *user;          /* User to finger on that host */
  int user_len;        /* Lenght of the user string */
  int user_pos;        /* Number bytes of user already written to network */
  void *resp;          /* Finger response read from network */
  int resp_len;        /* Number of allocated bytes resp points to */
  int resp_pos;        /* Number of resp bytes used so far */
  int timer_id;   	/* Timer associated with this connection */
};
int ncon;              /* Number of open TCP connections */

static void
fcon_free (struct fcon *fc)
{
  if (fc->fd >= 0) {
    if (fc->timer_id >= 0) cb_timer_free (fc->timer_id);
    cb_io_free (fc->fd, 1);
    cb_io_free (fc->fd, 0);
    close (fc->fd);
    ncon--;
  }
  xfree (fc->host);
  xfree (fc->user);
  xfree (fc->resp);
  xfree (fc);
}

static void
finger_done (struct fcon *fc)
{
  printf ("[%s]\n", fc->host);
  fwrite (fc->resp, 1, fc->resp_pos, stdout);
  fcon_free (fc);
}

/* 6033 - connection timeout */
static void 
conn_timedout(void *arg)
{
  struct fcon *fc = (struct fcon *)arg;
  printf ("Connection timed out.\n");
  fcon_free (fc);
}

static void
finger_getresp (void *_fc)
{
  struct fcon *fc = _fc;
  int n;
  
  cb_timer_free(fc->timer_id);
  fc->timer_id = -1;

  if (fc->resp_pos == fc->resp_len) {
    fc->resp_len = fc->resp_len ? fc->resp_len << 1 : 512;
    if (fc->resp_len > MAX_RESP_SIZE) {
      fprintf (stderr, "%s: response too large\n", fc->host);
      fcon_free (fc);
      return;
    }
    fc->resp = xrealloc (fc->resp, fc->resp_len);
  }

  n = read (fc->fd, fc->resp + fc->resp_pos, fc->resp_len - fc->resp_pos);
  if (n == 0) {
    finger_done (fc);
    return;
  }
  else if (n < 0) {
    if (errno == EAGAIN) {
      fc->timer_id = cb_timer_add(TIMEOUT, conn_timedout, fc);
      return;
    }
    else
      perror (fc->host);
    fcon_free (fc);
    return;
  }

  fc->resp_pos += n;
  fc->timer_id = cb_timer_add(TIMEOUT, conn_timedout, fc);
}

static void
finger_senduser (void *_fc)
{
  static int first_time = 1;
  struct fcon *fc = _fc;
  int n;

  cb_timer_free(fc->timer_id);
  fc->timer_id = -1;

  if (first_time)
  {
    struct sockaddr name;
    socklen_t len;
    int err, s;
    getsockopt(fc->fd, SOL_SOCKET, SO_ERROR, &err, &s);
    if (getpeername(fc->fd, &name, &len) < 0) {
      if (err) {
        fprintf(stderr,"%s: %s\n", fc->host, strerror(err));
        fcon_free(fc);
        return;
      }
    }
    first_time = 0;
  }

  n = write (fc->fd, fc->user + fc->user_pos, fc->user_len - fc->user_pos);
  if (n <= 0) {
    if (n == 0)
      fprintf (stderr, "%s: EOF\n", fc->host);
    else if (errno == EAGAIN) {
      fc->timer_id = cb_timer_add(TIMEOUT, conn_timedout, fc);
      return;
    }
    else
      perror (fc->host);
    fcon_free (fc);
    return;
  }

  fc->user_pos += n;
  if (fc->user_pos == fc->user_len) {
    cb_io_free (fc->fd, 1);
    cb_io_add (fc->fd, 0, finger_getresp, fc);
  }
  fc->timer_id = cb_timer_add(TIMEOUT, conn_timedout, fc);
}

static void
finger (char *arg)
{
  struct fcon *fc;
  char *p;
  struct hostent *h;
  struct sockaddr_in sin;

  p = strrchr (arg, '@');
  if (!p) {
    fprintf (stderr, "%s: ignored -- not of form 'user@host'\n", arg);
    return;
  }

  fc = xmalloc (sizeof (*fc));
  bzero (fc, sizeof (*fc));

  fc->fd = -1;
  fc->host = xmalloc (strlen (p));
  strcpy (fc->host, p + 1);
  fc->user_len = p - arg + 2;
  fc->user = xmalloc (fc->user_len + 1);
  memcpy (fc->user, arg, fc->user_len - 2);
  memcpy (fc->user + fc->user_len - 2, "\r\n", 3);
  fc->timer_id = -1;

  h = gethostbyname (fc->host);
  if (!h) {
    fprintf (stderr, "%s: hostname lookup failed\n", fc->host);
    fcon_free (fc);
    return;
  }

  fc->fd = socket (AF_INET, SOCK_STREAM, 0);
  if (fc->fd < 0)
    fatal ("socket: %s\n", strerror (errno));
  ncon++;
  make_async (fc->fd);

  bzero (&sin, sizeof (sin));
  sin.sin_family = AF_INET;
  sin.sin_port = htons (FINGER_PORT);
  sin.sin_addr = *(struct in_addr *) h->h_addr;
  if (connect (fc->fd, (struct sockaddr *) &sin, sizeof (sin)) < 0
      && errno != EINPROGRESS) {
    perror (fc->host);
    fcon_free (fc);
    return;
  }

  cb_io_add (fc->fd, 1, finger_senduser, fc);
  fc->timer_id = cb_timer_add(TIMEOUT, conn_timedout, fc);
}

int
main (int argc, char **argv)
{
  int argno;

  /* Writing to an unconnected socket will cause a process to receive
   * a SIGPIPE signal.  We don't want to die if this happens, so we
   * ignore SIGPIPE.  */
  signal (SIGPIPE, SIG_IGN);

  /* Fire off a finger request for every argument, but don't let the
   * number of outstanding connections exceed NCON_MAX. */
  for (argno = 1; argno < argc; argno++) {
    while (ncon >= NCON_MAX)
      cb_check ();
    finger (argv[argno]);
  }

  while (ncon > 0)
    cb_check ();
  exit (0);
}



home help back first fref pref prev next nref lref last post