https://users.rust-lang.org/t/execute-future-in-background-using-tokio-runtime/97780/2
↑を読んだらうまく書けそう(ebpf-afw-rust の話)。
以下、浩さんとのやりとりのメモリクルートに登録。Wantedly は技術によりすぎているかも。
全体的な新卒向けのサービスを使ってみたほうがいい。一般的な方面のところで、狭い部分に入り込む。
(就職)?四季報 を買った。
英語を活かしたい。
メーカー関連のインターンに行ってみる、など。インターンでなくても、会社の説明会でもいいかもしれない。
日本信号
use aya::maps::{MapData, RingBuf};
use ebpf\_afw\_common::UnknownPacket;
use std::os::fd::{AsRawFd, RawFd};
use std::time::Duration;
use anyhow::Error;
use tokio::io::unix::AsyncFd;
use tokio::sync::mpsc::Sender;
use tokio::time::timeout;
pub struct UnknownPacketsRingbuf {
inner: RingBuf<MapData>,
async\_fd: AsyncFd<RawFd>,
}
impl UnknownPacketsRingbuf {
pub fn new(inner: RingBuf<MapData>) -> Self {
let raw\_fd = inner.as\_raw\_fd();
Self {
inner,
async\_fd: AsyncFd::new(raw\_fd).unwrap(),
}
}
pub async fn send\_packets(&mut self,
mut up\_tx: Sender<UnknownPacket>,
) -> Result<(), Error> {
if let Ok(Ok(mut ready\_guard)) =
timeout(Duration::from\_millis(100), self.async\_fd.readable()).await {
ready\_guard.clear\_ready();
while let Some(data) = self.inner.next() {
let value = &\*data;
let up = UnknownPacket::try\_from(value).unwrap();
up\_tx.send(up).await?;
}
Ok(())
} else { Ok(()) }
}
/// 外から closure を受け取って、パケットが到達する度にその closure を呼び出すような Future を作成し、返します。
pub async fn register\_handler<T>(mut self, mut closure: T)
where
T: FnMut(UnknownPacket),
{
let async\_fd = AsyncFd::new(self.inner.as\_raw\_fd()).unwrap();
loop {
let mut guard = async\_fd.readable().await.unwrap();
while let Some(read) = (&mut self.inner).next() {
let value = &\*read;
let up = UnknownPacket::try\_from(value).unwrap();
closure(up);
}
guard.clear\_ready();
}
}
}use crate::ipv4\_rule\_map::{Ipv4RuleHashMap, Ipv4RuleMap};
use anyhow::{bail, Context, Error};
use aya::maps::{HashMap, MapError, RingBuf};
use aya::programs::{SchedClassifier, tc, TcAttachType};
use aya::{include\_bytes\_aligned, Bpf};
use aya\_log::BpfLogger;
use tokio::io::unix::AsyncFd;
use tokio::sync::mpsc::error::TryRecvError;
use tokio::sync::mpsc::{Receiver, Sender};
use tokio::task::{JoinHandle, yield\_now};
use ebpf\_afw\_common::{Ipv4FiveTuple, PacketAction, UnknownPacket};
use crate::classifier::ClassifierThreadAction::Stop;
use crate::unknown\_packets\_ringbuf::UnknownPacketsRingbuf;
/// rule map に基づいたパケットフィルタリングを実施する主体
/// インスタンスが作成されると、特定の network interface にアタッチされて動作を開始する
pub struct Classifier {
bpf: Bpf,
}
impl Classifier {
/// 与えられた network interface に対するフィルタリングを開始し、Classifier を返します。
pub fn try\_new(interface: &str) -> Result<(Self, UnknownPacketsRingbuf, Ipv4RuleMap), Error> {
// Bump the memlock rlimit. This is needed for older kernels that don't use the
// new memcg based accounting, see https://lwn.net/Articles/837122/
let rlim = libc::rlimit {
rlim\_cur: libc::RLIM\_INFINITY,
rlim\_max: libc::RLIM\_INFINITY,
};
let ret = unsafe { libc::setrlimit(libc::RLIMIT\_MEMLOCK, &rlim) };
if ret != 0 {
bail!("remove limit on locked memory failed, ret is: {}", ret)
}
#\[cfg(debug\_assertions)\]
let mut bpf = Bpf::load(include\_bytes\_aligned!(
"../../target/bpfel-unknown-none/debug/ebpf-afw"
))?;
#\[cfg(not(debug\_assertions))\]
let mut bpf = Bpf::load(include\_bytes\_aligned!(
"../../target/bpfel-unknown-none/release/ebpf-afw"
))?;
BpfLogger::init(&mut bpf)?;
// error adding clsact to the interface if it is already added is harmless
// the full cleanup can be done with 'sudo tc qdisc del dev eth0 clsact'.
let \_ = tc::qdisc\_add\_clsact(interface);
let program: &mut SchedClassifier = bpf.program\_mut("ebpf\_afw").unwrap().try\_into()?;
program.load()?;
program.attach(interface, TcAttachType::Egress)?;
program.attach(interface, TcAttachType::Ingress)?;
// eBPF プログラムが読むルールにあてはまらなかったパケットのリングバッファ
let unknown\_packets = RingBuf::try\_from(
bpf.take\_map("UNKNOWN\_PACKETS")
.context("Failed to take ringbuf map")?,
)?;
// eBPF プログラムが読むルールの一覧
let v4\_rule\_map: Ipv4RuleHashMap = HashMap::try\_from(
bpf.take\_map("V4\_RULE\_MAP")
.context("Failed to take hashmap map")?,
)?;
Ok((Self { bpf }, UnknownPacketsRingbuf::new(unknown\_packets), Ipv4RuleMap::new(v4\_rule\_map)))
}
}
#\[derive(Eq, PartialEq)\]
pub enum ClassifierThreadAction {
Stop,
AddRule(Ipv4FiveTuple, PacketAction),
RemoveRule(Ipv4FiveTuple),
}
pub async fn classifier\_thread(
interface: String,
mut cta\_rx: Receiver<ClassifierThreadAction>,
mut up\_tx: Sender<UnknownPacket>,
) -> Result<(), Error> {
let (classifier , mut ringbuf, mut v4\_rule\_map) = Classifier::try\_new(interface.as\_str())?;
loop {
let result = cta\_rx.try\_recv();
if let Err(e) = result {
match e {
TryRecvError::Empty => println!("empty"),
TryRecvError::Disconnected => bail!("disconnected, bail"),
}
} else {
let action = result.unwrap();
if action == Stop {
break Ok(());
} else {
process\_classifier\_thread\_action(&mut v4\_rule\_map, action)?;
}
}
println!("classifier");
ringbuf.send\_packets(up\_tx.clone()).await?;
// yield\_now().await;
}
}
fn process\_classifier\_thread\_action(
v4\_rule\_map: &mut Ipv4RuleMap,
classifier\_thread\_action: ClassifierThreadAction,
) -> Result<(), MapError> {
match classifier\_thread\_action {
ClassifierThreadAction::Stop => Ok(()),
ClassifierThreadAction::AddRule(tuple, packet\_action) => {
v4\_rule\_map.insert(&tuple, packet\_action)
},
ClassifierThreadAction::RemoveRule(tuple) => {
v4\_rule\_map.remove(&tuple)
}
}
}#!\[feature(const\_try)\]
use log::{error, info, warn};
use network\_types::eth::EthHdr;
use network\_types::ip::{IpProto, Ipv4Hdr};
use ebpf\_afw\_core::ipv4addr::Ipv4Addr;
use ebpf\_afw\_common::ipv4\_five\_tuple::Ipv4FiveTuple;
use ebpf\_afw\_common::packet\_action::PacketAction;
use ebpf\_afw\_common::port::Port;
use ebpf\_afw\_common::unknown\_packet::UnknownPacket;
use ebpf\_afw\_core::classifier::{Classifier, classifier\_thread, ClassifierThreadAction};
use ebpf\_afw\_core::ipv4\_rule\_map::Ipv4RuleMap;
use std::ptr::read\_unaligned;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc};
use tokio::{join, signal, task};
use tokio::sync::mpsc;
use tokio::sync::mpsc::error::TryRecvError;
use tokio::task::{LocalSet, spawn\_local, yield\_now};
#\[tokio::main\]
async fn main() -> Result<(), anyhow::Error> {
let s = LocalSet::new();
s.run\_until(inner\_main()).await
}
async fn inner\_main() -> Result<(), anyhow::Error> {
env\_logger::init();
let (\_, mut unknown\_packets\_ringbuf, mut v4\_rule\_map) = Classifier::try\_new("enp0s31f6")?;
if let Err(e) = v4\_rule\_map.load\_saved() {
warn!("Couldn't load v4 rule map from file: {}", e);
}
// 192.168.1.3 から 1.1.1.1 への ICMP Echo Request を送らないためのルールを試験的に追加
let example\_rule = &Ipv4FiveTuple::new(
Ipv4Addr::new(192, 168, 1, 3).into(),
Ipv4Addr::new(1, 1, 1, 1).into(),
Port::ANY,
IpProto::Icmp,
);
v4\_rule\_map.insert(example\_rule, PacketAction::Deny)?;
let local\_addr = Ipv4Addr::new(192, 168, 1, 3);
allow\_outgoing\_dns(
&mut v4\_rule\_map,
local\_addr,
Ipv4Addr::new(1, 1, 1, 1),
)?;
allow\_outgoing\_dns(
&mut v4\_rule\_map,
local\_addr,
Ipv4Addr::new(1, 0, 0, 1),
)?;
allow\_outgoing\_dns(
&mut v4\_rule\_map,
local\_addr,
Ipv4Addr::new(8, 8, 8, 8),
)?;
allow\_outgoing\_dns(
&mut v4\_rule\_map,
local\_addr,
Ipv4Addr::new(8, 8, 4, 4),
)?;
allow\_outgoing\_https(
&mut v4\_rule\_map,
local\_addr,
Ipv4Addr::new(1, 1, 1, 1),
)?;
let close = Arc::new(AtomicBool::new(false));
let (cta\_tx, cta\_rx) = mpsc::channel(100);
let (up\_tx, mut up\_rx) = mpsc::channel(100);
// setup signal handler
let close\_me = close.clone();
// let signal\_handler = spawn\_local(async move {
// info!("Waiting for Ctrl-C...");
// signal::ctrl\_c().await.unwrap();
// info!("Exiting...");
// close\_me.store(true, Ordering::Relaxed);
// cta\_tx.send(ClassifierThreadAction::Stop).await.expect("An error occured while trying to stop Classifier");
// });
let classifier\_handler = spawn\_local(
classifier\_thread("enp0s31f6".to\_string(), cta\_rx, up\_tx)
);
let packet\_handler = spawn\_local(async move {
loop {
if close.load(Ordering::Relaxed) {
let v4\_rule\_map\_filesize = v4\_rule\_map.save\_into\_file().unwrap();
info!("V4\_RULE\_MAP saved: {} bytes", &v4\_rule\_map\_filesize);
return;
}
let recv = up\_rx.try\_recv();
match recv {
Err(TryRecvError::Empty) => {},
Err(TryRecvError::Disconnected) => println!("disconnected"),
Ok(up) => process\_packet(up, &mut v4\_rule\_map),
}
yield\_now().await;
}
});
info!("Waiting for Ctrl-C...");
signal::ctrl\_c().await.unwrap();
info!("Exiting...");
close\_me.store(true, Ordering::Relaxed);
cta\_tx.send(ClassifierThreadAction::Stop).await.expect("An error occured while trying to stop Classifier");
Ok(())
}
/// BPF に格納されたルールに適合しなかったパケットに関する処理を実施します。
/// 今は試験的に、そのようなパケットを許可するルールを追加します。
fn process\_packet(up: UnknownPacket, v4\_rule\_map: &mut Ipv4RuleMap) {
if up.len < (Ipv4Hdr::LEN + EthHdr::LEN) {
return;
}
let (\_, rest) = up.data.split\_at(EthHdr::LEN);
let (ipv4hdr, \_) = rest.split\_at(Ipv4Hdr::LEN);
let buf = (ipv4hdr.as\_ptr()) as \*const Ipv4Hdr;
let ipv4hdr = unsafe { read\_unaligned(buf) };
info!(
"packet from {:#} to {:#}",
Ipv4Addr::from(ipv4hdr.src\_addr.to\_be()),
Ipv4Addr::from(ipv4hdr.dst\_addr.to\_be())
);
let new\_key: Ipv4FiveTuple = (&up).try\_into().unwrap();
let new\_value = PacketAction::Allow;
if let Err(error) = v4\_rule\_map.insert(&new\_key, new\_value) {
error!(
"An error happened while adding rule to BPF hashmap: {}",
error
);
} else {
info!("Added rule: {:#?}", new\_key);
}
}
fn allow\_outgoing\_dns(
v4\_rule\_map: &mut Ipv4RuleMap,
local\_addr: Ipv4Addr,
remote\_addr: Ipv4Addr,
) -> Result<(), anyhow::Error> {
let egress\_rule = &Ipv4FiveTuple::new(
local\_addr.into(),
remote\_addr.into(),
Port::ANY,
IpProto::Udp,
);
let ingress\_rule = &Ipv4FiveTuple::new(
remote\_addr.into(),
local\_addr.into(),
Port::ANY,
IpProto::Udp,
);
v4\_rule\_map.insert(egress\_rule, PacketAction::Allow)?;
v4\_rule\_map.insert(ingress\_rule, PacketAction::Allow)?;
Ok(())
}
fn allow\_outgoing\_https(
v4\_rule\_map: &mut Ipv4RuleMap,
local\_addr: Ipv4Addr,
remote\_addr: Ipv4Addr,
) -> Result<(), anyhow::Error> {
let egress\_rule = &Ipv4FiveTuple::new(
local\_addr.into(),
remote\_addr.into(),
Port {
dst\_port: 443,
..Port::ANY
},
IpProto::Tcp,
);
let ingress\_rule = &Ipv4FiveTuple::new(
remote\_addr.into(),
local\_addr.into(),
Port {
src\_port: 443,
..Port::ANY
},
IpProto::Tcp,
);
v4\_rule\_map.insert(egress\_rule, PacketAction::Allow)?;
v4\_rule\_map.insert(ingress\_rule, PacketAction::Allow)?;
Ok(())
}