Skip to content
Snippets Groups Projects
Verified Commit 0c792781 authored by Tomas Krizek's avatar Tomas Krizek
Browse files

pytests/proxy: import code

Original author: Grigorii Demidov <grigorii.demidov@nic.cz>
parent 0fc0a2d3
No related branches found
No related tags found
1 merge request!732pytests: update proxy + test_random_close
CC=gcc
CFLAGS_TLS=-DDEBUG -ggdb3 -O0 -lgnutls -luv
CFLAGS_TCP=-DDEBUG -ggdb3 -O0 -luv
all: tcproxy tlsproxy
tlsproxy: tls-proxy.o tlsproxy.o
$(CC) tls-proxy.o tlsproxy.o -o tlsproxy $(CFLAGS_TLS)
tls-proxy.o: tls-proxy.c tls-proxy.h array.h
$(CC) -c -o $@ $< $(CFLAGS_TLS)
tlsproxy.o: tlsproxy.c tls-proxy.h
$(CC) -c -o $@ $< $(CFLAGS_TLS)
tcproxy: tcp-proxy.o tcproxy.o
$(CC) tcp-proxy.o tcproxy.o -o tcproxy $(CFLAGS_TCP)
tcp-proxy.o: tcp-proxy.c tcp-proxy.h array.h
$(CC) -c -o $@ $< $(CFLAGS_TCP)
tcproxy.o: tcproxy.c tcp-proxy.h
$(CC) -c -o $@ $< $(CFLAGS_TCP)
clean:
rm -f tcp-proxy.o tcproxy.o tcproxy tls-proxy.o tlsproxy.o tlsproxy
.PHONY: all clean
../../../lib/generic/array.h
\ No newline at end of file
#include <assert.h>
#include <stdio.h>
#include <unistd.h>
#include <string.h>
#include <stdlib.h>
#include <stdbool.h>
#include <uv.h>
#include "array.h"
struct buf {
char buf[16 * 1024];
size_t size;
};
enum peer_state {
STATE_NOT_CONNECTED,
STATE_LISTENING,
STATE_CONNECTED,
STATE_CONNECT_IN_PROGRESS,
STATE_CLOSING_IN_PROGRESS
};
struct proxy_ctx {
uv_loop_t *loop;
uv_tcp_t server;
uv_tcp_t client;
uv_tcp_t upstream;
struct sockaddr_storage server_addr;
struct sockaddr_storage upstream_addr;
int server_state;
int client_state;
int upstream_state;
array_t(struct buf *) buffer_pool;
array_t(struct buf *) upstream_pending;
};
static void read_from_upstream_cb(uv_stream_t *upstream, ssize_t nread, const uv_buf_t *buf);
static void read_from_client_cb(uv_stream_t *client, ssize_t nread, const uv_buf_t *buf);
static struct buf *borrow_io_buffer(struct proxy_ctx *proxy)
{
struct buf *buf = NULL;
if (proxy->buffer_pool.len > 0) {
buf = array_tail(proxy->buffer_pool);
array_pop(proxy->buffer_pool);
} else {
buf = calloc(1, sizeof (struct buf));
}
return buf;
}
static void release_io_buffer(struct proxy_ctx *proxy, struct buf *buf)
{
if (!buf) {
return;
}
if (proxy->buffer_pool.len < 1000) {
buf->size = 0;
array_push(proxy->buffer_pool, buf);
} else {
free(buf);
}
}
static void push_to_upstream_pending(struct proxy_ctx *proxy, const char *buf, size_t size)
{
while (size > 0) {
struct buf *b = borrow_io_buffer(proxy);
b->size = size <= sizeof(b->buf) ? size : sizeof(b->buf);
memcpy(b->buf, buf, b->size);
array_push(proxy->upstream_pending, b);
size -= b->size;
}
}
static struct buf *get_first_upstream_pending(struct proxy_ctx *proxy)
{
struct buf *buf = NULL;
if (proxy->upstream_pending.len > 0) {
buf = proxy->upstream_pending.at[0];
}
return buf;
}
static void remove_first_upstream_pending(struct proxy_ctx *proxy)
{
for (int i = 1; i < proxy->upstream_pending.len; ++i) {
proxy->upstream_pending.at[i - 1] = proxy->upstream_pending.at[i];
}
if (proxy->upstream_pending.len > 0) {
proxy->upstream_pending.len -= 1;
}
}
static void clear_upstream_pending(struct proxy_ctx *proxy)
{
for (int i = 1; i < proxy->upstream_pending.len; ++i) {
struct buf *b = proxy->upstream_pending.at[i];
release_io_buffer(proxy, b);
}
proxy->upstream_pending.len = 0;
}
static void clear_buffer_pool(struct proxy_ctx *proxy)
{
for (int i = 1; i < proxy->buffer_pool.len; ++i) {
struct buf *b = proxy->buffer_pool.at[i];
free(b);
}
proxy->buffer_pool.len = 0;
}
static void alloc_uv_buffer(uv_handle_t *handle, size_t suggested_size, uv_buf_t *buf)
{
buf->base = (char*)malloc(suggested_size);
buf->len = suggested_size;
}
static void on_client_close(uv_handle_t *handle)
{
struct proxy_ctx *proxy = (struct proxy_ctx *)handle->loop->data;
proxy->client_state = STATE_NOT_CONNECTED;
}
static void on_upstream_close(uv_handle_t *handle)
{
struct proxy_ctx *proxy = (struct proxy_ctx *)handle->loop->data;
proxy->upstream_state = STATE_NOT_CONNECTED;
}
static void write_to_client_cb(uv_write_t *req, int status)
{
struct proxy_ctx *proxy = (struct proxy_ctx *)req->handle->loop->data;
free(req);
if (status) {
fprintf(stderr, "error writing to client: %s\n", uv_strerror(status));
clear_upstream_pending(proxy);
proxy->client_state = STATE_CLOSING_IN_PROGRESS;
uv_close((uv_handle_t*)&proxy->client, on_client_close);
}
}
static void write_to_upstream_cb(uv_write_t *req, int status)
{
struct proxy_ctx *proxy = (struct proxy_ctx *)req->handle->loop->data;
free(req);
if (status) {
fprintf(stderr, "error writing to upstream: %s\n", uv_strerror(status));
clear_upstream_pending(proxy);
proxy->upstream_state = STATE_CLOSING_IN_PROGRESS;
uv_close((uv_handle_t*)&proxy->upstream, on_upstream_close);
return;
}
if (proxy->upstream_pending.len > 0) {
struct buf *buf = get_first_upstream_pending(proxy);
remove_first_upstream_pending(proxy);
release_io_buffer(proxy, buf);
if (proxy->upstream_state == STATE_CONNECTED &&
proxy->upstream_pending.len > 0) {
buf = get_first_upstream_pending(proxy);
/* TODO avoid allocation */
uv_write_t *req = (uv_write_t *) malloc(sizeof(uv_write_t));
uv_buf_t wrbuf = uv_buf_init(buf->buf, buf->size);
uv_write(req, (uv_stream_t *)&proxy->upstream, &wrbuf, 1, write_to_upstream_cb);
}
}
}
static void on_client_connection(uv_stream_t *server, int status)
{
if (status < 0) {
fprintf(stderr, "incoming connection error: %s\n", uv_strerror(status));
return;
}
fprintf(stdout, "incoming connection\n");
struct proxy_ctx *proxy = (struct proxy_ctx *)server->loop->data;
if (proxy->client_state != STATE_NOT_CONNECTED) {
fprintf(stderr, "client already connected, ignoring\n");
return;
}
uv_tcp_init(proxy->loop, &proxy->client);
proxy->client_state = STATE_CONNECTED;
if (uv_accept(server, (uv_stream_t*)&proxy->client) == 0) {
uv_read_start((uv_stream_t*)&proxy->client, alloc_uv_buffer, read_from_client_cb);
} else {
proxy->client_state = STATE_CLOSING_IN_PROGRESS;
uv_close((uv_handle_t*)&proxy->client, on_client_close);
}
}
static void on_connect_to_upstream(uv_connect_t *req, int status)
{
struct proxy_ctx *proxy = (struct proxy_ctx *)req->handle->loop->data;
free(req);
if (status < 0) {
fprintf(stderr, "error connecting to upstream: %s\n", uv_strerror(status));
clear_upstream_pending(proxy);
proxy->upstream_state = STATE_CLOSING_IN_PROGRESS;
uv_close((uv_handle_t*)&proxy->upstream, on_upstream_close);
return;
}
proxy->upstream_state = STATE_CONNECTED;
uv_read_start((uv_stream_t*)&proxy->upstream, alloc_uv_buffer, read_from_upstream_cb);
if (proxy->upstream_pending.len > 0) {
struct buf *buf = get_first_upstream_pending(proxy);
/* TODO avoid allocation */
uv_write_t *wreq = (uv_write_t *) malloc(sizeof(uv_write_t));
uv_buf_t wrbuf = uv_buf_init(buf->buf, buf->size);
uv_write(wreq, (uv_stream_t *)&proxy->upstream, &wrbuf, 1, write_to_upstream_cb);
}
}
static void read_from_client_cb(uv_stream_t *client, ssize_t nread, const uv_buf_t *buf)
{
if (nread == 0) {
return;
}
struct proxy_ctx *proxy = (struct proxy_ctx *)client->loop->data;
if (nread < 0) {
if (nread != UV_EOF) {
fprintf(stderr, "error reading from client: %s\n", uv_err_name(nread));
}
if (proxy->client_state == STATE_CONNECTED) {
proxy->client_state = STATE_CLOSING_IN_PROGRESS;
uv_close((uv_handle_t*) client, on_client_close);
}
return;
}
if (proxy->upstream_state == STATE_CONNECTED) {
if (proxy->upstream_pending.len > 0) {
push_to_upstream_pending(proxy, buf->base, nread);
} else {
/* TODO avoid allocation */
uv_write_t *req = (uv_write_t *) malloc(sizeof(uv_write_t));
uv_buf_t wrbuf = uv_buf_init(buf->base, nread);
uv_write(req, (uv_stream_t *)&proxy->upstream, &wrbuf, 1, write_to_upstream_cb);
}
} else if (proxy->upstream_state == STATE_NOT_CONNECTED) {
/* TODO avoid allocation */
uv_tcp_init(proxy->loop, &proxy->upstream);
uv_connect_t *conn = (uv_connect_t *) malloc(sizeof(uv_connect_t));
proxy->upstream_state = STATE_CONNECT_IN_PROGRESS;
uv_tcp_connect(conn, &proxy->upstream, (struct sockaddr *)&proxy->upstream_addr,
on_connect_to_upstream);
push_to_upstream_pending(proxy, buf->base, nread);
} else if (proxy->upstream_state == STATE_CONNECT_IN_PROGRESS) {
push_to_upstream_pending(proxy, buf->base, nread);
}
}
static void read_from_upstream_cb(uv_stream_t *upstream, ssize_t nread, const uv_buf_t *buf)
{
if (nread == 0) {
return;
}
struct proxy_ctx *proxy = (struct proxy_ctx *)upstream->loop->data;
if (nread < 0) {
if (nread != UV_EOF) {
fprintf(stderr, "error reading from upstream: %s\n", uv_err_name(nread));
}
clear_upstream_pending(proxy);
if (proxy->upstream_state == STATE_CONNECTED) {
proxy->upstream_state = STATE_CLOSING_IN_PROGRESS;
uv_close((uv_handle_t*)&proxy->upstream, on_upstream_close);
}
return;
}
if (proxy->client_state == STATE_CONNECTED) {
/* TODO Avoid allocation */
uv_write_t *req = (uv_write_t *) malloc(sizeof(uv_write_t));
uv_buf_t wrbuf = uv_buf_init(buf->base, nread);
uv_write(req, (uv_stream_t *)&proxy->client, &wrbuf, 1, write_to_client_cb);
}
}
struct proxy_ctx *proxy_allocate()
{
return malloc(sizeof(struct proxy_ctx));
}
int proxy_init(struct proxy_ctx *proxy,
const char *server_addr, int server_port,
const char *upstream_addr, int upstream_port)
{
proxy->loop = uv_default_loop();
uv_tcp_init(proxy->loop, &proxy->server);
int res = uv_ip4_addr(server_addr, server_port, (struct sockaddr_in *)&proxy->server_addr);
if (res != 0) {
return res;
}
res = uv_ip4_addr(upstream_addr, upstream_port, (struct sockaddr_in *)&proxy->upstream_addr);
if (res != 0) {
return res;
}
array_init(proxy->buffer_pool);
array_init(proxy->upstream_pending);
proxy->server_state = STATE_NOT_CONNECTED;
proxy->client_state = STATE_NOT_CONNECTED;
proxy->upstream_state = STATE_NOT_CONNECTED;
proxy->loop->data = proxy;
return 0;
}
void proxy_free(struct proxy_ctx *proxy)
{
if (!proxy) {
return;
}
clear_upstream_pending(proxy);
clear_buffer_pool(proxy);
/* TODO correctly close all the uv_tcp_t */
free(proxy);
}
int proxy_start_listen(struct proxy_ctx *proxy)
{
uv_tcp_bind(&proxy->server, (const struct sockaddr*)&proxy->server_addr, 0);
int ret = uv_listen((uv_stream_t*)&proxy->server, 128, on_client_connection);
if (ret == 0) {
proxy->server_state = STATE_LISTENING;
}
return ret;
}
int proxy_run(struct proxy_ctx *proxy)
{
return uv_run(proxy->loop, UV_RUN_DEFAULT);
}
#pragma once
struct proxy_ctx;
struct proxy_ctx *proxy_allocate();
void proxy_free(struct proxy_ctx *proxy);
int proxy_init(struct proxy_ctx *proxy,
const char *server_addr, int server_port,
const char *upstream_addr, int upstream_port);
int proxy_start_listen(struct proxy_ctx *proxy);
int proxy_run(struct proxy_ctx *proxy);
#include <stdio.h>
#include "tcp-proxy.h"
int main()
{
struct proxy_ctx *proxy = proxy_allocate();
if (!proxy) {
fprintf(stderr, "can't allocate proxy structure\n");
return 1;
}
int res = proxy_init(proxy, "127.0.0.1", 54000, "127.0.0.1", 53001);
if (res) {
fprintf(stderr, "can't initialize proxy by given addresses\n");
return res;
}
res = proxy_start_listen(proxy);
if (res) {
fprintf(stderr, "error starting listen, error code: %i\n", res);
return res;
}
res = proxy_run(proxy);
proxy_free(proxy);
return res;
}
This diff is collapsed.
#pragma once
#include <stdint.h>
#include <stdbool.h>
#include <netinet/in.h>
struct args {
const char *local_addr;
uint16_t local_port;
const char *upstream;
uint16_t upstream_port;
bool rehandshake;
bool close_connection;
uint64_t close_timeout;
const char *cert_file;
const char *key_file;
};
struct tls_proxy_ctx;
struct tls_proxy_ctx *tls_proxy_allocate();
void tls_proxy_free(struct tls_proxy_ctx *proxy);
int tls_proxy_init(struct tls_proxy_ctx *proxy, const struct args *a);
int tls_proxy_start_listen(struct tls_proxy_ctx *proxy);
int tls_proxy_run(struct tls_proxy_ctx *proxy);
#include <stdio.h>
#include <getopt.h>
#include <stdlib.h>
#include <signal.h>
#include <errno.h>
#include <string.h>
#include <gnutls/gnutls.h>
#include "tls-proxy.h"
static char default_local_addr[] = "127.0.0.1";
static char default_upstream_addr[] = "127.0.0.1";
static char default_cert_path[] = "../certs/tt.cert.pem";
static char default_key_path[] = "../certs/tt.key.pem";
void help(char *argv[], struct args *a)
{
printf("Usage: %s [parameters] [rundir]\n", argv[0]);
printf("\nParameters:\n"
" -l, --local=[addr] Server address to bind to (default: %s).\n"
" -p, --lport=[port] Server port to bind to (default: %u).\n"
" -u, --upstream=[addr] Upstream address (default: %s).\n"
" -d, --uport=[port] Upstream port (default: %u).\n"
" -t, --cert=[path] Path to certificate file (default: %s).\n"
" -k, --key=[path] Path to key file (default: %s).\n"
" -c, --close=[N] Close connection to client after every N ms (default: no).\n"
" -r, --rehandshake Do TLS rehandshake after every 8 bytes sent to client (default: no).\n",
a->local_addr, a->local_port,
a->upstream, a->upstream_port,
a->cert_file, a->key_file
);
}
void init_args(struct args *a)
{
a->local_addr = default_local_addr;
a->local_port = 54000;
a->upstream = default_upstream_addr;
a->upstream_port = 53000;
a->cert_file = default_cert_path;
a->key_file = default_key_path;
a->rehandshake = false;
a->close_connection = false;
}
int main(int argc, char **argv)
{
long int li_value = 0;
int c = 0, li = 0;
struct option opts[] = {
{"local", required_argument, 0, 'l'},
{"lport", required_argument, 0, 'p'},
{"upstream", required_argument, 0, 'u'},
{"uport", required_argument, 0, 'd'},
{"cert", required_argument, 0, 't'},
{"key", required_argument, 0, 'k'},
{"close", required_argument, 0, 'c'},
{"rehandshake", no_argument, 0, 'r'},
{0, 0, 0, 0}
};
struct args args;
init_args(&args);
while ((c = getopt_long(argc, argv, "l:p:u:d:t:k:c:r", opts, &li)) != -1) {
switch (c)
{
case 'l':
args.local_addr = optarg;
break;
case 'u':
args.upstream = optarg;
break;
case 't':
args.cert_file = optarg;
break;
case 'k':
args.key_file = optarg;
break;
case 'p':
li_value = strtol(optarg, NULL, 10);
if (li_value <= 0 || li_value > UINT16_MAX) {
printf("error: '-p' requires a positive"
" number less or equal to 65535, not '%s'\n", optarg);
return -1;
}
args.local_port = (uint16_t)li_value;
break;
case 'd':
li_value = strtol(optarg, NULL, 10);
if (li_value <= 0 || li_value > UINT16_MAX) {
printf("error: '-d' requires a positive"
" number less or equal to 65535, not '%s'\n", optarg);
return -1;
}
args.upstream_port = (uint16_t)li_value;
break;
case 'c':
li_value = strtol(optarg, NULL, 10);
if (li_value <= 0) {
printf("[system] error '-c' requires a positive"
" number, not '%s'\n", optarg);
return -1;
}
args.close_connection = true;
args.close_timeout = li_value;
break;
case 'r':
args.rehandshake = true;
break;
default:
init_args(&args);
help(argv, &args);
return -1;
}
}
if (signal(SIGPIPE, SIG_IGN) == SIG_ERR) {
fprintf(stderr, "failed to set up SIGPIPE handler to ignore(%s)\n",
strerror(errno));
}
struct tls_proxy_ctx *proxy = tls_proxy_allocate();
if (!proxy) {
fprintf(stderr, "can't allocate tls_proxy structure\n");
return 1;
}
int res = tls_proxy_init(proxy, &args);
if (res) {
fprintf(stderr, "can't initialize tls_proxy structure\n");
return res;
}
res = tls_proxy_start_listen(proxy);
if (res) {
fprintf(stderr, "error starting listen, error code: %i\n", res);
return res;
}
fprintf(stdout, "Listen on %s#%u\n"
"Upstream is expected on %s#%u\n"
"Rehandshake %s\n"
"Close %s\n",
args.local_addr, args.local_port,
args.upstream, args.upstream_port,
args.rehandshake ? "yes" : "no",
args.close_connection ? "yes" : "no");
res = tls_proxy_run(proxy);
tls_proxy_free(proxy);
return res;
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment