Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion extensions/replication/queueProcessor/QueueProcessor.js
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,7 @@ class QueueProcessor extends EventEmitter {

if (Array.isArray(this.destConfig.replicationEndpoint.servers)) {
this.destHosts =
new RoundRobin(this.destConfig.replicationEndpoint.servers, { defaultPort: 80 });
new RoundRobin(this.destConfig.replicationEndpoint.servers);
if (this.destConfig.replicationEndpoint.echo) {
this._setupEcho();
}
Expand Down
21 changes: 13 additions & 8 deletions lib/Config.js
Original file line number Diff line number Diff line change
Expand Up @@ -90,14 +90,19 @@ class Config extends EventEmitter {
throw new Error('dataMoverTopic is required when lifecycle transitions is supported');
}

if (parsedConfig.extensions && parsedConfig.extensions.replication
&& parsedConfig.extensions.replication.destination
&& parsedConfig.extensions.replication.destination.bootstrapList) {
this.bootstrapList = parsedConfig.extensions.replication
.destination.bootstrapList;
} else {
this.bootstrapList = [];
}
const destination = parsedConfig.extensions?.replication?.destination;
this.bootstrapList = destination?.bootstrapList?.map(endpoint => {
if (!endpoint.servers) {
return endpoint;
}

const transport = destination.sites?.[endpoint.site]?.transport || destination.transport || 'http';
const defaultPort = transport === 'https' ? 443 : 80;
const servers = endpoint.servers.map(server => server.includes(':') ? server : `${server}:${defaultPort}`);

return { ...endpoint, servers };
}) ?? [];

const shouldRestrictToSite = process.env.BOOTSTRAP_SITE_NAME;
if (shouldRestrictToSite) {
this.bootstrapList = this.bootstrapList.filter(item => item.site === shouldRestrictToSite);
Expand Down
120 changes: 78 additions & 42 deletions tests/unit/conf/Config.js
Original file line number Diff line number Diff line change
Expand Up @@ -91,63 +91,99 @@ describe('Site name', () => {
});
});


describe('Config', () => {
describe('getReplicationSiteDestConfig', () => {
let ogConfigFileEnv;

before(() => {
ogConfigFileEnv = process.env.BACKBEAT_CONFIG_FILE;
});
afterEach(() => {
sinon.restore();
});

afterEach(() => sinon.restore());

after(() => {
if (ogConfigFileEnv) {
process.env.BACKBEAT_CONFIG_FILE = ogConfigFileEnv;
}
});
it('should return replication site destination config', () => {
process.env.BACKBEAT_CONFIG_FILE = `${__dirname}/configs/replicationMultiDestConfig.json`;
const conf = new Config();
const destConfig = conf.getReplicationSiteDestConfig('aws3');
assert.deepStrictEqual(destConfig, {
transport: 'https',
auth: {
type: 'service',
account: 'service-replication-3',
},
replicationEndpoint: {
site: 'aws3',
type: 'aws_s3',
},

describe('bootstrapList server normalization', () => {
let conf;

before(() => {
process.env.BACKBEAT_CONFIG_FILE = `${__dirname}/configs/replicationServersConfig.json`;
conf = new Config();
});

it('should normalize server entries with default port 443 for https transport', () => {
const entry = conf.bootstrapList.find(e => e.site === 'https-site');
assert.deepStrictEqual(entry.servers, ['s3.example.com:443']);
});

it('should normalize server entries with default port 80 for http transport', () => {
const entry = conf.bootstrapList.find(e => e.site === 'http-site');
assert.deepStrictEqual(entry.servers, ['s3.example.com:80']);
});

it('should preserve explicit port in server entries', () => {
const entry = conf.bootstrapList.find(e => e.site === 'explicit-port-site');
assert.deepStrictEqual(entry.servers, ['s3.example.com:8443']);
});

it('should not modify endpoint without servers array', () => {
const entry = conf.bootstrapList.find(e => e.site === 'aws-site');
assert.strictEqual(entry.servers, undefined);
assert.strictEqual(entry.type, 'aws_s3');
});
});

it('should return default replication destination config when site one is not available', () => {
process.env.BACKBEAT_CONFIG_FILE = `${__dirname}/configs/replicationMultiDestConfig.json`;
const conf = new Config();
sinon.stub(conf.extensions.replication, 'destination').value({
transport: 'https',
auth: {
type: 'service',
account: 'service-replication',
},
bootstrapList: [
{ site: 'aws1', type: 'aws_s3' },
{ site: 'aws2', type: 'aws_s3' },
{ site: 'aws3', type: 'aws_s3' }
]

describe('getReplicationSiteDestConfig', () => {
it('should return replication site destination config', () => {
process.env.BACKBEAT_CONFIG_FILE = `${__dirname}/configs/replicationMultiDestConfig.json`;
const conf = new Config();
const destConfig = conf.getReplicationSiteDestConfig('aws3');
assert.deepStrictEqual(destConfig, {
transport: 'https',
auth: {
type: 'service',
account: 'service-replication-3',
},
replicationEndpoint: {
site: 'aws3',
type: 'aws_s3',
},
});
});
const destConfig = conf.getReplicationSiteDestConfig('aws3');
assert.deepStrictEqual(destConfig, {
transport: 'https',
auth: {
type: 'service',
account: 'service-replication',
},
replicationEndpoint: {
site: 'aws3',
type: 'aws_s3',
},

it('should return default replication destination config when site one is not available', () => {
process.env.BACKBEAT_CONFIG_FILE = `${__dirname}/configs/replicationMultiDestConfig.json`;
const conf = new Config();
sinon.stub(conf.extensions.replication, 'destination').value({
transport: 'https',
auth: {
type: 'service',
account: 'service-replication',
},
bootstrapList: [
{ site: 'aws1', type: 'aws_s3' },
{ site: 'aws2', type: 'aws_s3' },
{ site: 'aws3', type: 'aws_s3' }
]
});
const destConfig = conf.getReplicationSiteDestConfig('aws3');
assert.deepStrictEqual(destConfig, {
transport: 'https',
auth: {
type: 'service',
account: 'service-replication',
},
replicationEndpoint: {
site: 'aws3',
type: 'aws_s3',
},
});
});
});
});
Expand Down
204 changes: 204 additions & 0 deletions tests/unit/conf/configs/replicationServersConfig.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,204 @@
{
"zookeeper": {
"connectionString": "127.0.0.1:2181/backbeat",
"autoCreateNamespace": false
},
"kafka": {
"hosts": "127.0.0.1:9092",
"backlogMetrics": {
"zkPath": "/backbeat/run/kafka-backlog-metrics",
"intervalS": 60
},
"maxRequestSize": 5000020
},
"s3": {
"host": "127.0.0.1",
"port": 8000
},
"vaultAdmin": {
"host": "127.0.0.1",
"port": 8500
},
"replicationGroupId": "RG001 ",
"queuePopulator": {
"cronRule": "*/5 * * * * *",
"batchMaxRead": 10000,
"batchTimeoutMs": 9000,
"zookeeperPath": "/queue-populator",
"logSource": "mongo",
"bucketd": {
"host": "127.0.0.1",
"port": 9000
},
"dmd": {
"host": "127.0.0.1",
"port": 9990
},
"mongo": {
"replicaSetHosts":
"localhost:27017,localhost:27018,localhost:27019",
"writeConcern": "majority",
"replicaSet": "rs0",
"readPreference": "primary",
"database": "metadata"
},
"kafka": {
"topic": "backbeat-oplog",
"consumerGroupId": "backbeat-qp-oplog-group"
},
"probeServer": {
"bindAddress": "0.0.0.0",
"port": 4042
}
},
"extensions": {
"replication": {
"source": {
"transport": "http",
"s3": {
"host": "127.0.0.1",
"port": 8000
},
"auth": {
"type": "service",
"account": "service-replication",
"vault": {
"host": "127.0.0.1",
"port": 8500,
"adminPort": 8600
}
}
},
"destination": {
"transport": "http",
"sites": {
"https-site": {
"transport": "https"
},
"http-site": {
"transport": "http"
}
},
"bootstrapList": [
{ "site": "https-site", "servers": ["s3.example.com"] },
{ "site": "http-site", "servers": ["s3.example.com"] },
{ "site": "explicit-port-site", "servers": ["s3.example.com:8443"] },
{ "site": "aws-site", "type": "aws_s3" }
],
"auth": {
"type": "service",
"account": "service-replication"
}
},
"topic": "backbeat-replication",
"dataMoverTopic": "backbeat-data-mover",
"replicationStatusTopic": "backbeat-replication-status",
"replicationFailedTopic": "backbeat-replication-failed",
"monitorReplicationFailures": true,
"monitorReplicationFailureExpiryTimeS": 86400,
"replayTopics": [
{
"topicName": "backbeat-replication-replay-0",
"retries": 5
}
],
"queueProcessor": {
"groupId": "backbeat-replication-group",
"retry": {
"aws_s3": {
"maxRetries": 5,
"timeoutS": 900,
"backoff": {
"min": 60000,
"max": 900000,
"jitter": 0.1,
"factor": 1.5
}
},
"azure": {
"maxRetries": 5,
"timeoutS": 900,
"backoff": {
"min": 60000,
"max": 900000,
"jitter": 0.1,
"factor": 1.5
}
},
"gcp": {
"maxRetries": 5,
"timeoutS": 900,
"backoff": {
"min": 60000,
"max": 900000,
"jitter": 0.1,
"factor": 1.5
}
},
"scality": {
"maxRetries": 5,
"timeoutS": 300,
"backoff": {
"min": 1000,
"max": 300000,
"jitter": 0.1,
"factor": 1.5
}
}
},
"concurrency": 10,
"mpuPartsConcurrency": 10,
"probeServer": {
"bindAddress": "localhost",
"port": 4043
}
},
"replicationStatusProcessor": {
"groupId": "backbeat-replication-group",
"retry": {
"maxRetries": 5,
"timeoutS": 300,
"backoff": {
"min": 1000,
"max": 300000,
"jitter": 0.1,
"factor": 1.5
}
},
"concurrency": 10,
"probeServer": {
"bindAddress": "localhost",
"port": 4045
}
},
"objectSizeMetrics": [
66560,
8388608,
68157440
]
}
},
"log": {
"logLevel": "info",
"dumpLevel": "error"
},
"metrics": {
"topic": "backbeat-metrics"
},
"server": {
"healthChecks": {
"allowFrom": ["127.0.0.1/8", "::1"]
},
"host": "127.0.0.1",
"port": 8900
},
"redis": {
"host": "localhost",
"port": 6379
},
"certFilePaths": {
"key": "",
"cert": "",
"ca": ""
}
}
Loading