Skip to content

Commit 8dc07d4

Browse files
committed
Misc improvements
Add helpers to generate random values for workload. Use BTF to attach fentry/fexit to BPF programs (WIP). Add SIGTERM hook to finish workers processes.
1 parent 026c376 commit 8dc07d4

File tree

10 files changed

+496
-122
lines changed

10 files changed

+496
-122
lines changed

Cargo.lock

Lines changed: 20 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,3 +27,4 @@ pest = "2.8.1"
2727
pest_derive = "2.8.1"
2828
llvm-sys = "201.0.1"
2929
docopt = "1.1.1"
30+
signal-hook = "0.3.18"

src/machine.rs

Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,16 @@ use crate::script::ast::MachineInstruction;
33
use log::{debug, trace};
44
use std::{
55
io::{prelude::*, BufReader},
6+
mem,
67
net::TcpListener,
78
thread,
89
};
910

11+
use libc::SYS_bpf;
12+
13+
use aya::{include_bytes_aligned, programs::FEntry, Btf, Ebpf, Endianness};
14+
use aya_obj::generated::{bpf_attr, bpf_btf_info, bpf_cmd};
15+
1016
#[derive(Debug)]
1117
pub enum MachineError {
1218
Internal,
@@ -62,10 +68,80 @@ fn start_server(addr: String, target_port: u16) -> Result<(), MachineError> {
6268
Ok(())
6369
}
6470

71+
fn start_bpf_profiling() -> Result<(), MachineError> {
72+
debug!("Starting eBPF profiling");
73+
let btf_fd;
74+
let mut buf = vec![0u8; 4096];
75+
76+
// Load the BPF program
77+
unsafe {
78+
let mut fd_attr = mem::zeroed::<bpf_attr>();
79+
fd_attr.__bindgen_anon_6.__bindgen_anon_1.btf_id = 293;
80+
81+
btf_fd = libc::syscall(
82+
SYS_bpf,
83+
bpf_cmd::BPF_BTF_GET_FD_BY_ID,
84+
&fd_attr,
85+
mem::size_of::<bpf_attr>(),
86+
);
87+
88+
let mut info_attr = mem::zeroed::<bpf_attr>();
89+
let mut info = mem::zeroed::<bpf_btf_info>();
90+
91+
info.btf = buf.as_mut_ptr() as _;
92+
info.btf_size = buf.len() as _;
93+
94+
info_attr.info.bpf_fd = btf_fd as u32;
95+
info_attr.info.info = &info as *const _ as u64;
96+
info_attr.info.info_len = mem::size_of_val(&info) as u32;
97+
98+
libc::syscall(
99+
SYS_bpf,
100+
bpf_cmd::BPF_OBJ_GET_INFO_BY_FD,
101+
&info_attr,
102+
mem::size_of::<bpf_attr>(),
103+
);
104+
}
105+
106+
let btf = Btf::parse(&buf, Endianness::default()).unwrap();
107+
108+
debug!("Got btf {:?}", btf);
109+
110+
debug!("Loading eBPF program");
111+
let mut bpf = match Ebpf::load(include_bytes_aligned!("../fentry.bpf.o")) {
112+
Ok(prog) => {
113+
debug!("Loaded prog");
114+
prog
115+
}
116+
Err(e) => {
117+
debug!("Cannot load eBPF program, {e}");
118+
panic!("AAA")
119+
}
120+
};
121+
122+
debug!("Loaded eBPF program");
123+
//let btf = Btf::from_sys_fs().unwrap();
124+
let btf = match Btf::parse_file("/tmp/btf", Endianness::default()) {
125+
Ok(data) => data,
126+
Err(e) => panic!("Cannot parse BTF {e}"),
127+
};
128+
let fentry: &mut FEntry =
129+
bpf.program_mut("fentry_XXX").unwrap().try_into().unwrap();
130+
131+
fentry.load("handle_tp", &btf).unwrap();
132+
debug!("Loaded fentry program");
133+
134+
fentry.attach().unwrap();
135+
debug!("Attached fentry");
136+
137+
Ok(())
138+
}
139+
65140
pub fn apply(instr: MachineInstruction) -> Result<(), MachineError> {
66141
match instr {
67142
MachineInstruction::Server { port } => {
68143
start_server("127.0.0.1".to_string(), port)
69144
}
145+
MachineInstruction::Profile { target: _ } => start_bpf_profiling(),
70146
}
71147
}

src/main.rs

Lines changed: 85 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,9 @@ use serde::Deserialize;
3131
use std::time::SystemTime;
3232
use std::{thread, time};
3333

34+
use std::sync::atomic::{AtomicBool, Ordering};
35+
use std::sync::Arc;
36+
3437
use berserker::machine::apply;
3538
use berserker::script::{ast::Node, parser::parse_instructions};
3639
use berserker::{
@@ -53,8 +56,9 @@ struct Args {
5356
flag_f: Option<String>,
5457
}
5558

56-
fn run_script(script_path: String) -> Vec<Option<i32>> {
59+
fn run_script(script_path: String) -> Vec<(i32, u64)> {
5760
let mut handles = vec![];
61+
info!("Loading script: {:?}", script_path);
5862

5963
let ast: Vec<Node> =
6064
parse_instructions(&std::fs::read_to_string(script_path).unwrap())
@@ -72,8 +76,8 @@ fn run_script(script_path: String) -> Vec<Option<i32>> {
7276
};
7377

7478
for instr in m_instructions {
75-
debug!("INSTR {:?}", instr);
76-
thread::spawn(move || apply(instr.clone()));
79+
//thread::spawn(move || apply(instr.clone()));
80+
let _ = apply(instr.clone());
7781
}
7882
};
7983

@@ -96,14 +100,22 @@ fn run_script(script_path: String) -> Vec<Option<i32>> {
96100
.unwrap_or(String::from("0"))
97101
.parse()
98102
.unwrap();
103+
104+
let duration: u64 = args
105+
.get("duration")
106+
.cloned()
107+
.unwrap_or(String::from("0"))
108+
.parse()
109+
.unwrap();
110+
99111
let h: Vec<_> = (0..workers)
100112
.map(|_| {
101113
let worker = new_script_worker(node.clone());
102114

103115
match fork() {
104116
Ok(Fork::Parent(child)) => {
105117
info!("Child {}", child);
106-
Some(child)
118+
Some((child, duration))
107119
}
108120
Ok(Fork::Child) => {
109121
worker.run_payload().unwrap();
@@ -120,10 +132,10 @@ fn run_script(script_path: String) -> Vec<Option<i32>> {
120132
handles.extend(h);
121133
});
122134

123-
handles
135+
handles.iter().filter_map(|i| *i).collect()
124136
}
125137

126-
fn run_workload(config: WorkloadConfig) -> Vec<Option<i32>> {
138+
fn run_workload(config: WorkloadConfig) -> Vec<(i32, u64)> {
127139
let mut lower = 1024;
128140
let mut upper = 1024;
129141

@@ -142,7 +154,7 @@ fn run_workload(config: WorkloadConfig) -> Vec<Option<i32>> {
142154
match fork() {
143155
Ok(Fork::Parent(child)) => {
144156
info!("Child {}", child);
145-
Some(child)
157+
Some((child, config.duration))
146158
}
147159
Ok(Fork::Child) => {
148160
if config.per_core {
@@ -162,83 +174,101 @@ fn run_workload(config: WorkloadConfig) -> Vec<Option<i32>> {
162174
.collect();
163175

164176
info!("In total: {}", upper);
165-
handles
177+
handles.iter().filter_map(|i| *i).collect()
166178
}
167179

168180
fn main() {
169181
env_logger::init();
170182

183+
let terminating = Arc::new(AtomicBool::new(false));
184+
signal_hook::flag::register(
185+
signal_hook::consts::SIGTERM,
186+
Arc::clone(&terminating),
187+
)
188+
.unwrap();
189+
171190
let args: Args = Docopt::new(USAGE)
172191
.and_then(|d| d.deserialize())
173192
.unwrap_or_else(|e| e.exit());
174193

175194
debug!("ARGS {:?}", args);
176195

177-
let default_config = String::from("workload.toml");
178196
let duration_timer = SystemTime::now();
179197
let script_path = args.flag_f;
180-
let config_path = args.flag_c.unwrap_or(default_config);
181-
182-
let config = Config::builder()
183-
// Add in `./Settings.toml`
184-
.add_source(
185-
config::File::with_name("/etc/berserker/workload.toml")
186-
.required(false),
187-
)
188-
.add_source(
189-
config::File::with_name(config_path.as_str()).required(false),
190-
)
191-
// Add in settings from the environment (with a prefix of APP)
192-
// Eg.. `BERSERKER__WORKLOAD__ARRIVAL_RATE=1` would set the `arrival_rate` key
193-
.add_source(
194-
config::Environment::with_prefix("BERSERKER")
195-
.try_parsing(true)
196-
.separator("__"),
197-
)
198-
.build()
199-
.unwrap()
200-
.try_deserialize::<WorkloadConfig>()
201-
.unwrap();
202-
203-
info!("Config: {:?}", config);
204198

205199
let handles = match script_path {
206200
Some(path) => run_script(path),
207-
None => run_workload(config),
201+
None => {
202+
let default_config = String::from("workload.toml");
203+
let config_path = args.flag_c.unwrap_or(default_config);
204+
205+
let config = Config::builder()
206+
// Add in `./Settings.toml`
207+
.add_source(
208+
config::File::with_name("/etc/berserker/workload.toml")
209+
.required(false),
210+
)
211+
.add_source(
212+
config::File::with_name(config_path.as_str())
213+
.required(false),
214+
)
215+
// Add in settings from the environment (with a prefix of APP)
216+
// Eg.. `BERSERKER__WORKLOAD__ARRIVAL_RATE=1` would set the `arrival_rate` key
217+
.add_source(
218+
config::Environment::with_prefix("BERSERKER")
219+
.try_parsing(true)
220+
.separator("__"),
221+
)
222+
.build()
223+
.unwrap()
224+
.try_deserialize::<WorkloadConfig>()
225+
.unwrap();
226+
227+
info!("Config: {:?}", config);
228+
run_workload(config)
229+
}
208230
};
209231

210232
let processes = &handles.clone();
211233

212234
thread::scope(|s| {
213-
if config.duration != 0 {
214-
// Spin a watcher thread
215-
s.spawn(move || loop {
216-
thread::sleep(time::Duration::from_secs(1));
217-
let elapsed = duration_timer.elapsed().unwrap().as_secs();
218-
219-
if elapsed > config.duration {
220-
for handle in processes.iter().flatten() {
221-
info!("Terminating: {}", *handle);
222-
let _ = kill(Pid::from_raw(*handle), Signal::SIGTERM);
223-
}
235+
// Spin a watcher thread
236+
s.spawn(move || loop {
237+
thread::sleep(time::Duration::from_secs(1));
238+
let elapsed = duration_timer.elapsed().unwrap().as_secs();
239+
240+
// Find all processes with expired duration. If we've received
241+
// SIGTERM, get all processes.
242+
let expired = processes
243+
.iter()
244+
.filter(|(_, duration)| {
245+
(*duration > 0 && *duration < elapsed)
246+
|| terminating.load(Ordering::Relaxed)
247+
})
248+
.collect::<Vec<_>>();
249+
250+
for (handle, _) in &expired {
251+
info!("Terminating: {}", *handle);
252+
let _ = kill(Pid::from_raw(*handle), Signal::SIGKILL);
253+
}
224254

225-
break;
226-
}
227-
});
228-
}
255+
if expired.len() == processes.len() {
256+
break;
257+
}
258+
});
229259

230260
s.spawn(move || {
231-
for handle in processes.iter().flatten() {
232-
info!("waitpid: {}", *handle);
233-
match waitpid(Pid::from_raw(*handle), None) {
261+
for (handle, _) in handles {
262+
info!("waitpid: {}", handle);
263+
match waitpid(Pid::from_raw(handle), None) {
234264
Ok(_) => {
235-
info!("{:?} stopped", *handle)
265+
info!("{handle:?} stopped")
236266
}
237267
Err(Errno::ECHILD) => {
238-
info! {"no process {:?} found", *handle}
268+
info!("no process {handle:?} found")
239269
}
240270
Err(e) => {
241-
panic! {"cannot wait for {:?}: {:?} ", *handle, e}
271+
panic!("cannot wait for {handle:?}: {e:?}")
242272
}
243273
};
244274
}

src/script/ast.rs

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,20 +3,21 @@ use std::collections::HashMap;
33
#[derive(Debug, Clone)]
44
pub enum Arg {
55
Const { text: String },
6-
76
Var { name: String },
7+
Dynamic { name: String, args: Vec<Arg> },
88
}
99

1010
#[derive(Debug, Clone)]
1111
pub enum Instruction {
12-
Task { name: String, args: Vec<Arg> },
13-
Open { path: String },
14-
Debug { text: String },
12+
Task { name: Arg, args: Vec<Arg> },
13+
Open { path: Arg },
14+
Debug { text: Arg },
1515
}
1616

1717
#[derive(Debug, Clone)]
1818
pub enum MachineInstruction {
1919
Server { port: u16 },
20+
Profile { target: String },
2021
}
2122

2223
#[derive(Debug, Clone)]

0 commit comments

Comments
 (0)