diff --git a/Makefile b/Makefile index e9c536e..653f425 100644 --- a/Makefile +++ b/Makefile @@ -1,11 +1,10 @@ # Compiler and flags CC = gcc CFLAGS = -Wall -g -LDFLAGS = -lpthread -SSLFLAGS = -lssl -lcrypto +LDFLAGS = -lpthread -lcrypto # Source files -SERVER_SRC = server.c cache.c +SERVER_SRC = server.c cache.c conhash.c global_ring.c CLIENT_SRC = client.c LOAD_BALANCER_SRC = load_balancer.c conhash.c DB_SERVER_SRC = db_server.c mockdb.c @@ -16,12 +15,6 @@ CLIENT_BIN = client LOAD_BALANCER_BIN = load_balancer DB_SERVER_BIN = db_server -# Configuration file to store server ports -SERVER_CONFIG = servers.txt - -# Header files (for dependency tracking) -HEADERS = cache.h mockdb.h - # Default target: Build all components all: $(SERVER_BIN) $(CLIENT_BIN) $(LOAD_BALANCER_BIN) $(DB_SERVER_BIN) @@ -31,11 +24,11 @@ $(SERVER_BIN): $(SERVER_SRC) $(HEADERS) # Build the client $(CLIENT_BIN): $(CLIENT_SRC) - $(CC) $(CFLAGS) $(CLIENT_SRC) -o $(CLIENT_BIN) $(SSLFLAGS) + $(CC) $(CFLAGS) $(CLIENT_SRC) -o $(CLIENT_BIN) $(LDFLAGS) # Build the load balancer -$(LOAD_BALANCER_BIN): $(LOAD_BALANCER_SRC) - $(CC) $(CFLAGS) $(LOAD_BALANCER_SRC) -o $(LOAD_BALANCER_BIN) $(LDFLAGS) $(SSLFLAGS) +$(LOAD_BALANCER_BIN): $(LOAD_BALANCER_SRC) global_ring.c + $(CC) $(CFLAGS) $(LOAD_BALANCER_SRC) global_ring.c -o $(LOAD_BALANCER_BIN) $(LDFLAGS) # Build the database server $(DB_SERVER_BIN): $(DB_SERVER_SRC) $(HEADERS) @@ -51,7 +44,7 @@ run-server: echo "Usage: make run-server PORT="; \ exit 1; \ fi; \ - echo "127.0.0.1:$(PORT)" >> $(SERVER_CONFIG); \ + echo "127.0.0.1:$(PORT)" >> servers.txt; \ ./$(SERVER_BIN) $(PORT) # Run the load balancer @@ -64,4 +57,4 @@ run-client: # Clean up generated files clean: - rm -f $(SERVER_BIN) $(CLIENT_BIN) $(LOAD_BALANCER_BIN) $(DB_SERVER_BIN) $(SERVER_CONFIG) + rm -f $(SERVER_BIN) $(CLIENT_BIN) $(LOAD_BALANCER_BIN) $(DB_SERVER_BIN) servers.txt diff --git a/README.MD b/README.MD new file mode 100644 index 0000000..3d064eb --- /dev/null +++ b/README.MD @@ -0,0 +1,22 @@ +This project requires mutliple terminals to work and the system will be a representation of different processes that would normally represent a distributed cache system. + +Commands: +To build the app: +make +To start the database server: +make run-db-server +To start the load balancer: +make run-load-balancer: + +To start a server: +make run-server PORT= (port can be any valid available port) + +To start a client: +make run-client + +Commands in the client: +get (fetches the key from the database and saves it to the cache or if it already is in the cache, only fetches it from the cache) +delete (deletes the key from all caches and the database) +set (saves the value with the keyvalue to a cache and the database) + && (does the functions concurrently, can be more than two functions) (functions are get set delete) +batch .txt (Does the functions in the text file line by line) diff --git a/cache.c b/cache.c index 4aad636..084afe9 100644 --- a/cache.c +++ b/cache.c @@ -4,7 +4,7 @@ #include #include "cache.h" -#define MAX_CACHE_SIZE 3 +#define MAX_CACHE_SIZE 5 Cache *create_cache() { Cache *cache = (Cache *)malloc(sizeof(Cache)); @@ -18,12 +18,10 @@ void move_to_head(Cache *cache, CacheItem *item) { return; } - // Remove from current position if (item->prev) item->prev->next = item->next; if (item->next) item->next->prev = item->prev; if (cache->tail == item) cache->tail = item->prev; - // Move to head item->prev = NULL; item->next = cache->head; if (cache->head) cache->head->prev = item; @@ -48,7 +46,6 @@ void evict_lru(Cache *cache) { } void cache_set(Cache *cache, const char *key, const char *value, int ttl) { - // Check if the key already exists CacheItem *current = cache->head; while (current) { if (strcmp(current->key, key) == 0) { @@ -60,7 +57,6 @@ void cache_set(Cache *cache, const char *key, const char *value, int ttl) { current = current->next; } - // Add a new item CacheItem *new_item = (CacheItem *)malloc(sizeof(CacheItem)); strncpy(new_item->key, key, MAX_KEY_LENGTH); strncpy(new_item->value, value, MAX_VALUE_LENGTH); @@ -74,7 +70,6 @@ void cache_set(Cache *cache, const char *key, const char *value, int ttl) { cache->size++; - // Evict the least recently used item if the cache is full if (cache->size > MAX_CACHE_SIZE) { evict_lru(cache); } @@ -86,20 +81,18 @@ char *cache_get(Cache *cache, const char *key) { while (current) { if (strcmp(current->key, key) == 0) { - // Check if the item has expired if (current->expiry != 0 && current->expiry <= now) { - cache_delete(cache, key); // Remove expired item + cache_delete(cache, key); return NULL; } - // Move the accessed item to the head move_to_head(cache, current); return current->value; } current = current->next; } - return NULL; // Key not found + return NULL; } void cache_delete(Cache *cache, const char *key) { @@ -107,7 +100,6 @@ void cache_delete(Cache *cache, const char *key) { while (current) { if (strcmp(current->key, key) == 0) { - // Unlink the item from the list if (current->prev) { current->prev->next = current->next; } else { @@ -120,7 +112,6 @@ void cache_delete(Cache *cache, const char *key) { cache->tail = current->prev; } - // Free the memory and update the size free(current); cache->size--; return; diff --git a/cache.h b/cache.h index 55d9a34..a4b1d3a 100644 --- a/cache.h +++ b/cache.h @@ -1,61 +1,27 @@ #ifndef CACHE_H #define CACHE_H -#include #define MAX_KEY_LENGTH 256 #define MAX_VALUE_LENGTH 256 -// Cache item structure typedef struct CacheItem { - char key[MAX_KEY_LENGTH]; // Key of the cache item - char value[MAX_VALUE_LENGTH]; // Value of the cache item - time_t expiry; // Expiry time (0 if no expiry) - struct CacheItem *next; // Pointer to the next item (for LRU) - struct CacheItem *prev; // Pointer to the previous item (for LRU) + char key[MAX_KEY_LENGTH]; + char value[MAX_VALUE_LENGTH]; + time_t expiry; + struct CacheItem *next; + struct CacheItem *prev; } CacheItem; -// Cache structure typedef struct Cache { - CacheItem *head; // Most recently used item - CacheItem *tail; // Least recently used item - int size; // Current size of the cache + CacheItem *head; + CacheItem *tail; + int size; } Cache; - -/** - * Create a new cache. - * @return Pointer to the newly created cache. - */ Cache *create_cache(); - -/** - * Set a key-value pair in the cache. - * @param cache Pointer to the cache. - * @param key The key to set. - * @param value The value to set. - * @param ttl Time-to-live in seconds (0 for no expiry). - */ void cache_set(Cache *cache, const char *key, const char *value, int ttl); - -/** - * Get the value associated with a key from the cache. - * @param cache Pointer to the cache. - * @param key The key to retrieve. - * @return Pointer to the value if found and not expired; NULL otherwise. - */ char *cache_get(Cache *cache, const char *key); - -/** - * Delete a key from the cache. - * @param cache Pointer to the cache. - * @param key The key to delete. - */ void cache_delete(Cache *cache, const char *key); - -/** - * Free all memory associated with the cache. - * @param cache Pointer to the cache. - */ void free_cache(Cache *cache); -#endif // CACHE_H +#endif diff --git a/client b/client deleted file mode 100644 index 6cb37ad..0000000 Binary files a/client and /dev/null differ diff --git a/client.c b/client.c index 73ede2d..10b8cd7 100644 --- a/client.c +++ b/client.c @@ -9,7 +9,8 @@ #define LOAD_BALANCER_ADDRESS "127.0.0.1" #define LOAD_BALANCER_PORT 9090 -// Function to send a single command to the load balancer +pthread_mutex_t lock; + void send_to_load_balancer(const char *command) { int sock; struct sockaddr_in lb_addr; @@ -39,22 +40,24 @@ void send_to_load_balancer(const char *command) { close(sock); } -// Thread function for executing a single command void *execute_command(void *arg) { char *command = (char *)arg; + + pthread_mutex_lock(&lock); send_to_load_balancer(command); + pthread_mutex_unlock(&lock); + free(command); return NULL; } -// Execute concurrent commands using threads void execute_concurrent_commands(char *input) { pthread_t threads[BUFFER_SIZE / 10]; int thread_count = 0; char *token = strtok(input, "&&"); while (token) { - while (*token == ' ') token++; // Trim leading spaces + while (*token == ' ') token++; char *command = malloc(strlen(token) + 1); strcpy(command, token); @@ -100,13 +103,15 @@ void process_batch_file(const char *filename) { char line[BUFFER_SIZE]; while (fgets(line, sizeof(line), file)) { - line[strcspn(line, "\n")] = '\0'; // Remove newline character - if (strlen(line) == 0) continue; // Skip empty lines + line[strcspn(line, "\n")] = '\0'; + if (strlen(line) == 0) continue; if (strstr(line, "&&")) { execute_concurrent_commands(line); } else { + pthread_mutex_lock(&lock); send_to_load_balancer(line); + pthread_mutex_unlock(&lock); } } @@ -114,26 +119,24 @@ void process_batch_file(const char *filename) { } int main() { + pthread_mutex_init(&lock, NULL); + char input[BUFFER_SIZE]; while (1) { printf("Enter command (use '&&' for concurrency, 'exit' to quit): "); fgets(input, BUFFER_SIZE, stdin); - input[strcspn(input, "\n")] = '\0'; // Remove newline character - + input[strcspn(input, "\n")] = '\0'; if (strcmp(input, "exit") == 0) { break; } - // Check for concurrent commands if (strstr(input, "&&") != NULL) { execute_concurrent_commands(input); } else if (strncmp(input, "batch ", 6) == 0) { - // Extract the filename char *filename = input + 6; printf("Processing batch file: %s\n", filename); - // Count lines and process the file int linecount = count_lines_in_file(filename); printf("Total lines in file: %d\n", linecount); @@ -143,5 +146,6 @@ int main() { } } + pthread_mutex_destroy(&lock); return 0; } diff --git a/conhash.c b/conhash.c index 68ac235..2bd0622 100644 --- a/conhash.c +++ b/conhash.c @@ -1,10 +1,9 @@ #include #include #include -#include // For EVP interface +#include #include "conhash.h" -// Hash a string using EVP for MD5 uint32_t hash(const char *key) { unsigned char digest[EVP_MAX_MD_SIZE]; unsigned int digest_len; @@ -40,7 +39,6 @@ uint32_t hash(const char *key) { return hash_value; } -// Add a node to the hash ring void add_node(HashRing *ring, const char *address) { if (ring->node_count >= MAX_NODES) { fprintf(stderr, "Max nodes reached\n"); @@ -53,7 +51,6 @@ void add_node(HashRing *ring, const char *address) { ring->nodes[ring->node_count++] = node; - // Sort nodes by hash for consistent hashing for (int i = 0; i < ring->node_count - 1; i++) { for (int j = 0; j < ring->node_count - i - 1; j++) { if (ring->nodes[j].hash > ring->nodes[j + 1].hash) { @@ -65,7 +62,6 @@ void add_node(HashRing *ring, const char *address) { } } -// Find the node for a given key const char *get_node(HashRing *ring, const char *key) { if (ring->node_count == 0) { fprintf(stderr, "No nodes in the ring\n"); @@ -74,27 +70,22 @@ const char *get_node(HashRing *ring, const char *key) { uint32_t key_hash = hash(key); - // Find the first node with a hash greater than the key's hash for (int i = 0; i < ring->node_count; i++) { if (key_hash <= ring->nodes[i].hash) { return ring->nodes[i].address; } } - // Wrap around to the first node if no match is found return ring->nodes[0].address; } -// Remove a node from the hash ring void remove_node(HashRing *ring, const char *address) { int found = 0; - // Find the node by address for (int i = 0; i < ring->node_count; i++) { if (strcmp(ring->nodes[i].address, address) == 0) { found = 1; - // Shift all subsequent nodes left for (int j = i; j < ring->node_count - 1; j++) { ring->nodes[j] = ring->nodes[j + 1]; } @@ -109,3 +100,20 @@ void remove_node(HashRing *ring, const char *address) { fprintf(stderr, "Node '%s' not found in the hash ring\n", address); } } + +const char *get_secondary_node(HashRing *ring, const char *key) { + if (ring->node_count < 2) { + return NULL; + } + + uint32_t key_hash = hash(key); + + for (int i = 0; i < ring->node_count; i++) { + if (key_hash <= ring->nodes[i].hash) { + return ring->nodes[(i + 1) % ring->node_count].address; + } + } + + return ring->nodes[1].address; +} + diff --git a/conhash.h b/conhash.h index b2d8f5f..7611afc 100644 --- a/conhash.h +++ b/conhash.h @@ -1,46 +1,35 @@ #ifndef HASH_RING_H #define HASH_RING_H -#include // For uint32_t -#include // For EVP interface +#include -// Maximum number of nodes in the hash ring #define MAX_NODES 10 -// Node structure representing a server in the hash ring typedef struct Node { - char address[256]; // Server address (e.g., "127.0.0.1:8080") - uint32_t hash; // Hash of the server's address + char address[256]; + uint32_t hash; } Node; -// HashRing structure representing the consistent hash ring typedef struct HashRing { - Node nodes[MAX_NODES]; // Array of nodes - int node_count; // Number of nodes currently in the ring + Node nodes[MAX_NODES]; + int node_count; } HashRing; -/** - * Hash a string using EVP for MD5. - * @param key The key to be hashed. - * @return A 32-bit hash value of the key. - */ +extern HashRing ring; + + uint32_t hash(const char *key); -/** - * Add a node to the hash ring. - * @param ring Pointer to the HashRing. - * @param address The address of the node to add (e.g., "127.0.0.1:8080"). - */ + void add_node(HashRing *ring, const char *address); -/** - * Find the appropriate node for a given key. - * @param ring Pointer to the HashRing. - * @param key The key to map to a node. - * @return The address of the node responsible for the key. - */ + const char *get_node(HashRing *ring, const char *key); void remove_node(HashRing *ring, const char *address); -#endif // HASH_RING_H + +const char *get_secondary_node(HashRing *ring, const char *key); + + +#endif \ No newline at end of file diff --git a/db_server b/db_server deleted file mode 100644 index 860f85c..0000000 Binary files a/db_server and /dev/null differ diff --git a/db_server.c b/db_server.c index 8f3bd79..f12cdc9 100644 --- a/db_server.c +++ b/db_server.c @@ -50,7 +50,6 @@ int main() { MockDB *db = create_mockdb(); - // Create and bind socket server_socket = socket(AF_INET, SOCK_STREAM, 0); if (server_socket < 0) { perror("Failed to create socket"); diff --git a/global_ring.c b/global_ring.c new file mode 100644 index 0000000..f8206ed --- /dev/null +++ b/global_ring.c @@ -0,0 +1,3 @@ +#include "conhash.h" + +HashRing ring = {0}; diff --git a/load_balancer b/load_balancer deleted file mode 100644 index 6614fcd..0000000 Binary files a/load_balancer and /dev/null differ diff --git a/load_balancer.c b/load_balancer.c index 14d9534..f58a1ef 100644 --- a/load_balancer.c +++ b/load_balancer.c @@ -7,14 +7,12 @@ #include "conhash.h" #define BUFFER_SIZE 1024 -#define LB_PORT 9090 // Port for the load balancer -#define ANNOUNCE_PORT 9091 // Port for servers to announce themselves -#define HEALTH_CHECK_INTERVAL 5 // Health check interval in seconds +#define LB_PORT 9090 +#define ANNOUNCE_PORT 9091 +#define HEALTH_CHECK_INTERVAL 5 -HashRing ring = {0}; // Global consistent hash ring pthread_mutex_t lock; -// Function to check server health int is_server_alive(const char *server_address) { char ip[256]; int port; @@ -23,31 +21,26 @@ int is_server_alive(const char *server_address) { int sock; struct sockaddr_in server_addr; - // Create socket sock = socket(AF_INET, SOCK_STREAM, 0); if (sock < 0) { - return 0; // Server not reachable + return 0; } server_addr.sin_family = AF_INET; server_addr.sin_port = htons(port); inet_pton(AF_INET, ip, &server_addr.sin_addr); - // Try to connect to the server int result = connect(sock, (struct sockaddr *)&server_addr, sizeof(server_addr)); close(sock); - return result == 0; // Server is alive if connection succeeds + return result == 0; } -// Health check thread function void *health_check(void *arg) { while (1) { - sleep(HEALTH_CHECK_INTERVAL); // Wait for the next health check - + sleep(HEALTH_CHECK_INTERVAL); pthread_mutex_lock(&lock); - // Iterate through the hash ring and check server health for (int i = 0; i < ring.node_count; i++) { const char *server_address = ring.nodes[i].address; if (!is_server_alive(server_address)) { @@ -61,7 +54,29 @@ void *health_check(void *arg) { return NULL; } -// Forward request to the appropriate server +const char *get_next_node(const char *current_server_address) { + pthread_mutex_lock(&lock); + int current_index = -1; + for (int i = 0; i < ring.node_count; i++) { + if (strcmp(ring.nodes[i].address, current_server_address) == 0) { + current_index = i; + break; + } + } + + if (current_index == -1) { + pthread_mutex_unlock(&lock); + return NULL; + } + + + int next_index = (current_index + 1) % ring.node_count; + const char *next_server_address = ring.nodes[next_index].address; + + pthread_mutex_unlock(&lock); + return next_server_address; +} + void forward_to_server(const char *server_address, const char *client_request, int client_socket) { char ip[256]; int port; @@ -71,7 +86,6 @@ void forward_to_server(const char *server_address, const char *client_request, i struct sockaddr_in server_addr; char buffer[BUFFER_SIZE] = {0}; - // Create socket for connecting to server server_socket = socket(AF_INET, SOCK_STREAM, 0); if (server_socket < 0) { perror("Failed to create server socket"); @@ -83,32 +97,75 @@ void forward_to_server(const char *server_address, const char *client_request, i server_addr.sin_port = htons(port); inet_pton(AF_INET, ip, &server_addr.sin_addr); - // Connect to the server if (connect(server_socket, (struct sockaddr *)&server_addr, sizeof(server_addr)) < 0) { - perror("Failed to connect to server"); + perror("Failed to connect to primary server"); send(client_socket, "Error: Server connection failed\n", 34, 0); close(server_socket); return; } - // Forward client request to server send(server_socket, client_request, strlen(client_request), 0); - // Receive server response int bytes_received = recv(server_socket, buffer, BUFFER_SIZE - 1, 0); if (bytes_received > 0) { buffer[bytes_received] = '\0'; - // Append server address to response char enhanced_response[BUFFER_SIZE]; - snprintf(enhanced_response, BUFFER_SIZE, "Server: %s | Response: %s", server_address, buffer); + snprintf(enhanced_response, BUFFER_SIZE, "Primary Server: %s | Response: %s", server_address, buffer); send(client_socket, enhanced_response, strlen(enhanced_response), 0); } close(server_socket); + + const char *replica_server_address = get_next_node(server_address); + if (replica_server_address && strcmp(replica_server_address, server_address) != 0 ) { + char replica_ip[256]; + int replica_port; + sscanf(replica_server_address, "%[^:]:%d", replica_ip, &replica_port); + + int replica_socket; + struct sockaddr_in replica_addr; + + replica_socket = socket(AF_INET, SOCK_STREAM, 0); + if (replica_socket < 0) { + perror("Failed to create replica socket"); + return; + } + + replica_addr.sin_family = AF_INET; + replica_addr.sin_port = htons(replica_port); + inet_pton(AF_INET, replica_ip, &replica_addr.sin_addr); + + if (connect(replica_socket, (struct sockaddr *)&replica_addr, sizeof(replica_addr)) < 0) { + perror("Failed to connect to replica server"); + close(replica_socket); + return; + } + + if (strncmp(client_request, "set", 3) == 0) { + send(replica_socket, client_request, strlen(client_request), 0); + } else if (strncmp(client_request, "get", 3) == 0) { + char replica_buffer[BUFFER_SIZE] = {0}; + send(replica_socket, client_request, strlen(client_request), 0); + int replica_bytes_received = recv(replica_socket, replica_buffer, BUFFER_SIZE - 1, 0); + + if (replica_bytes_received > 0) { + replica_buffer[replica_bytes_received] = '\0'; + + char replica_response[BUFFER_SIZE]; + snprintf(replica_response, BUFFER_SIZE, "Replica Server: %s | Response: %s", replica_server_address, replica_buffer); + send(client_socket, replica_response, strlen(replica_response), 0); + } + } + else if (strncmp(client_request, "delete", 6) == 0) { + send(replica_socket, client_request, strlen(client_request), 0); + } + + close(replica_socket); + } } -// Handle client connections + void *handle_client(void *client_socket_ptr) { int client_socket = *(int *)client_socket_ptr; free(client_socket_ptr); @@ -116,11 +173,9 @@ void *handle_client(void *client_socket_ptr) { char buffer[BUFFER_SIZE] = {0}; recv(client_socket, buffer, BUFFER_SIZE, 0); - // Parse the key from the client request char command[10], key[256]; sscanf(buffer, "%s %s", command, key); - // Get the appropriate server for the key from the ring pthread_mutex_lock(&lock); const char *server_address = get_node(&ring, key); pthread_mutex_unlock(&lock); @@ -136,7 +191,6 @@ void *handle_client(void *client_socket_ptr) { return NULL; } -// Handle server announcements void *handle_server_announcement(void *arg) { int announce_socket; struct sockaddr_in announce_addr, server_addr; @@ -167,7 +221,7 @@ void *handle_server_announcement(void *arg) { if (bytes_received > 0) { buffer[bytes_received] = '\0'; pthread_mutex_lock(&lock); - add_node(&ring, buffer); // Add the server to the hash ring + add_node(&ring, buffer); pthread_mutex_unlock(&lock); printf("Server '%s' added to the ring\n", buffer); } @@ -180,7 +234,6 @@ void *handle_server_announcement(void *arg) { int main() { pthread_mutex_init(&lock, NULL); - // Create threads for server announcements and health checks pthread_t announce_thread, health_check_thread; pthread_create(&announce_thread, NULL, handle_server_announcement, NULL); pthread_create(&health_check_thread, NULL, health_check, NULL); @@ -189,7 +242,6 @@ int main() { struct sockaddr_in lb_addr, client_addr; socklen_t addr_len = sizeof(client_addr); - // Create socket for load balancer lb_socket = socket(AF_INET, SOCK_STREAM, 0); if (lb_socket < 0) { perror("Failed to create load balancer socket"); diff --git a/mockdb.c b/mockdb.c index a5b6793..7c74168 100644 --- a/mockdb.c +++ b/mockdb.c @@ -4,12 +4,10 @@ #include "cache.h" #include "mockdb.h" -// Create a mock database and populate it with dummy data MockDB *create_mockdb() { MockDB *db = (MockDB *)malloc(sizeof(MockDB)); db->count = 0; - // Add some dummy data for (int i = 0; i < 10; i++) { snprintf(db->keys[i], MAX_KEY_LENGTH, "key%d", i); snprintf(db->values[i], MAX_VALUE_LENGTH, "value%d", i); @@ -18,7 +16,6 @@ MockDB *create_mockdb() { return db; } -// Set a key-value pair in the mock database void db_set(MockDB *db, const char *key, const char *value) { for (int i = 0; i < db->count; i++) { if (strcmp(db->keys[i], key) == 0) { @@ -31,21 +28,18 @@ void db_set(MockDB *db, const char *key, const char *value) { db->count++; } -// Get a value from the mock database char *db_get(MockDB *db, const char *key) { for (int i = 0; i < db->count; i++) { if (strcmp(db->keys[i], key) == 0) { return db->values[i]; } } - return NULL; // Not found + return NULL; } -// Delete a key from the mock database void db_delete(MockDB *db, const char *key) { for (int i = 0; i < db->count; i++) { if (strcmp(db->keys[i], key) == 0) { - // Shift remaining entries left for (int j = i; j < db->count - 1; j++) { strncpy(db->keys[j], db->keys[j + 1], MAX_KEY_LENGTH); strncpy(db->values[j], db->values[j + 1], MAX_VALUE_LENGTH); @@ -56,7 +50,6 @@ void db_delete(MockDB *db, const char *key) { } } -// Free mock database memory void free_mockdb(MockDB *db) { free(db); } diff --git a/server b/server deleted file mode 100644 index 3017c25..0000000 Binary files a/server and /dev/null differ diff --git a/server.c b/server.c index 354dd69..3995d34 100644 --- a/server.c +++ b/server.c @@ -5,6 +5,7 @@ #include #include "cache.h" #include "mockdb.h" +#include "conhash.h" #define BUFFER_SIZE 1024 #define DB_SERVER_ADDRESS "127.0.0.1" @@ -61,6 +62,35 @@ char *db_request(const char *command, const char *key, const char *value) { return response; } +// +void replicate_to_secondary(const char *secondary_address, const char *command, const char *key, const char *value) { + int sock; + struct sockaddr_in sec_addr; + char buffer[BUFFER_SIZE]; + + sock = socket(AF_INET, SOCK_STREAM, 0); + if (sock < 0) return; + + sec_addr.sin_family = AF_INET; + sec_addr.sin_port = htons(atoi(strchr(secondary_address, ':') + 1)); // Extract port + inet_pton(AF_INET, strtok(strdup(secondary_address), ":"), &sec_addr.sin_addr); + + if (connect(sock, (struct sockaddr *)&sec_addr, sizeof(sec_addr)) < 0) { + close(sock); + return; + } + + if (strcmp(command, "delete") == 0) { + snprintf(buffer, BUFFER_SIZE, "%s %s", command, key); + } else { + snprintf(buffer, BUFFER_SIZE, "%s %s %s", command, key, value ? value : ""); + } + + send(sock, buffer, strlen(buffer), 0); + close(sock); +} + + void handle_client(int client_socket, Cache *cache) { char buffer[BUFFER_SIZE]; @@ -75,8 +105,12 @@ void handle_client(int client_socket, Cache *cache) { char response[BUFFER_SIZE] = {0}; if (strcmp(command, "set") == 0) { - cache_set(cache, key, value, 60); + cache_set(cache, key, value, 1000); db_request("set", key, value); + const char *secondary_address = get_secondary_node(&ring, key); + if (secondary_address) { + replicate_to_secondary(secondary_address, "set", key, value); // Replicate to secondary + } snprintf(response, BUFFER_SIZE, "OK"); } else if (strcmp(command, "get") == 0) { char *result = cache_get(cache, key); @@ -87,7 +121,7 @@ void handle_client(int client_socket, Cache *cache) { printf("Cache Miss: %s\n", key); result = db_request("get", key, NULL); if (strcmp(result, "null") != 0) { - cache_set(cache, key, result, 60); + cache_set(cache, key, result, 1000); snprintf(response, BUFFER_SIZE, "%s", result); } else { snprintf(response, BUFFER_SIZE, "null"); @@ -96,6 +130,10 @@ void handle_client(int client_socket, Cache *cache) { } else if (strcmp(command, "delete") == 0) { cache_delete(cache, key); db_request("delete", key, NULL); + const char *secondary_address = get_secondary_node(&ring, key); + if (secondary_address) { + replicate_to_secondary(secondary_address, "delete", key, NULL); // Replicate deletion + } snprintf(response, BUFFER_SIZE, "OK"); } else { snprintf(response, BUFFER_SIZE, "Invalid command");