Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 7 additions & 14 deletions Makefile
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
# Compiler and flags
CC = gcc
CFLAGS = -Wall -g
LDFLAGS = -lpthread
SSLFLAGS = -lssl -lcrypto
LDFLAGS = -lpthread -lcrypto

# Source files
SERVER_SRC = server.c cache.c
SERVER_SRC = server.c cache.c conhash.c global_ring.c
CLIENT_SRC = client.c
LOAD_BALANCER_SRC = load_balancer.c conhash.c
DB_SERVER_SRC = db_server.c mockdb.c
Expand All @@ -16,12 +15,6 @@ CLIENT_BIN = client
LOAD_BALANCER_BIN = load_balancer
DB_SERVER_BIN = db_server

# Configuration file to store server ports
SERVER_CONFIG = servers.txt

# Header files (for dependency tracking)
HEADERS = cache.h mockdb.h

# Default target: Build all components
all: $(SERVER_BIN) $(CLIENT_BIN) $(LOAD_BALANCER_BIN) $(DB_SERVER_BIN)

Expand All @@ -31,11 +24,11 @@ $(SERVER_BIN): $(SERVER_SRC) $(HEADERS)

# Build the client
$(CLIENT_BIN): $(CLIENT_SRC)
$(CC) $(CFLAGS) $(CLIENT_SRC) -o $(CLIENT_BIN) $(SSLFLAGS)
$(CC) $(CFLAGS) $(CLIENT_SRC) -o $(CLIENT_BIN) $(LDFLAGS)

# Build the load balancer
$(LOAD_BALANCER_BIN): $(LOAD_BALANCER_SRC)
$(CC) $(CFLAGS) $(LOAD_BALANCER_SRC) -o $(LOAD_BALANCER_BIN) $(LDFLAGS) $(SSLFLAGS)
$(LOAD_BALANCER_BIN): $(LOAD_BALANCER_SRC) global_ring.c
$(CC) $(CFLAGS) $(LOAD_BALANCER_SRC) global_ring.c -o $(LOAD_BALANCER_BIN) $(LDFLAGS)

# Build the database server
$(DB_SERVER_BIN): $(DB_SERVER_SRC) $(HEADERS)
Expand All @@ -51,7 +44,7 @@ run-server:
echo "Usage: make run-server PORT=<port>"; \
exit 1; \
fi; \
echo "127.0.0.1:$(PORT)" >> $(SERVER_CONFIG); \
echo "127.0.0.1:$(PORT)" >> servers.txt; \
./$(SERVER_BIN) $(PORT)

# Run the load balancer
Expand All @@ -64,4 +57,4 @@ run-client:

# Clean up generated files
clean:
rm -f $(SERVER_BIN) $(CLIENT_BIN) $(LOAD_BALANCER_BIN) $(DB_SERVER_BIN) $(SERVER_CONFIG)
rm -f $(SERVER_BIN) $(CLIENT_BIN) $(LOAD_BALANCER_BIN) $(DB_SERVER_BIN) servers.txt
22 changes: 22 additions & 0 deletions README.MD
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
This project requires mutliple terminals to work and the system will be a representation of different processes that would normally represent a distributed cache system.

Commands:
To build the app:
make
To start the database server:
make run-db-server
To start the load balancer:
make run-load-balancer:

To start a server:
make run-server PORT=<port> (port can be any valid available port)

To start a client:
make run-client

Commands in the client:
get <keyvalue> (fetches the key from the database and saves it to the cache or if it already is in the cache, only fetches it from the cache)
delete <keyvalue> (deletes the key from all caches and the database)
set <keyvalue> <Value> (saves the value with the keyvalue to a cache and the database)
<function> && <function> (does the functions concurrently, can be more than two functions) (functions are get set delete)
batch <pathToBatchFile>.txt (Does the functions in the text file line by line)
15 changes: 3 additions & 12 deletions cache.c
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
#include <time.h>
#include "cache.h"

#define MAX_CACHE_SIZE 3
#define MAX_CACHE_SIZE 5

Cache *create_cache() {
Cache *cache = (Cache *)malloc(sizeof(Cache));
Expand All @@ -18,12 +18,10 @@ void move_to_head(Cache *cache, CacheItem *item) {
return;
}

// Remove from current position
if (item->prev) item->prev->next = item->next;
if (item->next) item->next->prev = item->prev;
if (cache->tail == item) cache->tail = item->prev;

// Move to head
item->prev = NULL;
item->next = cache->head;
if (cache->head) cache->head->prev = item;
Expand All @@ -48,7 +46,6 @@ void evict_lru(Cache *cache) {
}

void cache_set(Cache *cache, const char *key, const char *value, int ttl) {
// Check if the key already exists
CacheItem *current = cache->head;
while (current) {
if (strcmp(current->key, key) == 0) {
Expand All @@ -60,7 +57,6 @@ void cache_set(Cache *cache, const char *key, const char *value, int ttl) {
current = current->next;
}

// Add a new item
CacheItem *new_item = (CacheItem *)malloc(sizeof(CacheItem));
strncpy(new_item->key, key, MAX_KEY_LENGTH);
strncpy(new_item->value, value, MAX_VALUE_LENGTH);
Expand All @@ -74,7 +70,6 @@ void cache_set(Cache *cache, const char *key, const char *value, int ttl) {

cache->size++;

// Evict the least recently used item if the cache is full
if (cache->size > MAX_CACHE_SIZE) {
evict_lru(cache);
}
Expand All @@ -86,28 +81,25 @@ char *cache_get(Cache *cache, const char *key) {

while (current) {
if (strcmp(current->key, key) == 0) {
// Check if the item has expired
if (current->expiry != 0 && current->expiry <= now) {
cache_delete(cache, key); // Remove expired item
cache_delete(cache, key);
return NULL;
}

// Move the accessed item to the head
move_to_head(cache, current);
return current->value;
}
current = current->next;
}

return NULL; // Key not found
return NULL;
}

void cache_delete(Cache *cache, const char *key) {
CacheItem *current = cache->head;

while (current) {
if (strcmp(current->key, key) == 0) {
// Unlink the item from the list
if (current->prev) {
current->prev->next = current->next;
} else {
Expand All @@ -120,7 +112,6 @@ void cache_delete(Cache *cache, const char *key) {
cache->tail = current->prev;
}

// Free the memory and update the size
free(current);
cache->size--;
return;
Expand Down
52 changes: 9 additions & 43 deletions cache.h
Original file line number Diff line number Diff line change
@@ -1,61 +1,27 @@
#ifndef CACHE_H
#define CACHE_H

#include <time.h>

#define MAX_KEY_LENGTH 256
#define MAX_VALUE_LENGTH 256

// Cache item structure
typedef struct CacheItem {
char key[MAX_KEY_LENGTH]; // Key of the cache item
char value[MAX_VALUE_LENGTH]; // Value of the cache item
time_t expiry; // Expiry time (0 if no expiry)
struct CacheItem *next; // Pointer to the next item (for LRU)
struct CacheItem *prev; // Pointer to the previous item (for LRU)
char key[MAX_KEY_LENGTH];
char value[MAX_VALUE_LENGTH];
time_t expiry;
struct CacheItem *next;
struct CacheItem *prev;
} CacheItem;

// Cache structure
typedef struct Cache {
CacheItem *head; // Most recently used item
CacheItem *tail; // Least recently used item
int size; // Current size of the cache
CacheItem *head;
CacheItem *tail;
int size;
} Cache;

/**
* Create a new cache.
* @return Pointer to the newly created cache.
*/
Cache *create_cache();

/**
* Set a key-value pair in the cache.
* @param cache Pointer to the cache.
* @param key The key to set.
* @param value The value to set.
* @param ttl Time-to-live in seconds (0 for no expiry).
*/
void cache_set(Cache *cache, const char *key, const char *value, int ttl);

/**
* Get the value associated with a key from the cache.
* @param cache Pointer to the cache.
* @param key The key to retrieve.
* @return Pointer to the value if found and not expired; NULL otherwise.
*/
char *cache_get(Cache *cache, const char *key);

/**
* Delete a key from the cache.
* @param cache Pointer to the cache.
* @param key The key to delete.
*/
void cache_delete(Cache *cache, const char *key);

/**
* Free all memory associated with the cache.
* @param cache Pointer to the cache.
*/
void free_cache(Cache *cache);

#endif // CACHE_H
#endif
Binary file removed client
Binary file not shown.
26 changes: 15 additions & 11 deletions client.c
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@
#define LOAD_BALANCER_ADDRESS "127.0.0.1"
#define LOAD_BALANCER_PORT 9090

// Function to send a single command to the load balancer
pthread_mutex_t lock;

void send_to_load_balancer(const char *command) {
int sock;
struct sockaddr_in lb_addr;
Expand Down Expand Up @@ -39,22 +40,24 @@ void send_to_load_balancer(const char *command) {
close(sock);
}

// Thread function for executing a single command
void *execute_command(void *arg) {
char *command = (char *)arg;

pthread_mutex_lock(&lock);
send_to_load_balancer(command);
pthread_mutex_unlock(&lock);

free(command);
return NULL;
}

// Execute concurrent commands using threads
void execute_concurrent_commands(char *input) {
pthread_t threads[BUFFER_SIZE / 10];
int thread_count = 0;

char *token = strtok(input, "&&");
while (token) {
while (*token == ' ') token++; // Trim leading spaces
while (*token == ' ') token++;

char *command = malloc(strlen(token) + 1);
strcpy(command, token);
Expand Down Expand Up @@ -100,40 +103,40 @@ void process_batch_file(const char *filename) {

char line[BUFFER_SIZE];
while (fgets(line, sizeof(line), file)) {
line[strcspn(line, "\n")] = '\0'; // Remove newline character
if (strlen(line) == 0) continue; // Skip empty lines
line[strcspn(line, "\n")] = '\0';
if (strlen(line) == 0) continue;

if (strstr(line, "&&")) {
execute_concurrent_commands(line);
} else {
pthread_mutex_lock(&lock);
send_to_load_balancer(line);
pthread_mutex_unlock(&lock);
}
}

fclose(file);
}

int main() {
pthread_mutex_init(&lock, NULL);

char input[BUFFER_SIZE];

while (1) {
printf("Enter command (use '&&' for concurrency, 'exit' to quit): ");
fgets(input, BUFFER_SIZE, stdin);
input[strcspn(input, "\n")] = '\0'; // Remove newline character

input[strcspn(input, "\n")] = '\0';
if (strcmp(input, "exit") == 0) {
break;
}

// Check for concurrent commands
if (strstr(input, "&&") != NULL) {
execute_concurrent_commands(input);
} else if (strncmp(input, "batch ", 6) == 0) {
// Extract the filename
char *filename = input + 6;
printf("Processing batch file: %s\n", filename);

// Count lines and process the file
int linecount = count_lines_in_file(filename);
printf("Total lines in file: %d\n", linecount);

Expand All @@ -143,5 +146,6 @@ int main() {
}
}

pthread_mutex_destroy(&lock);
return 0;
}
Loading