Skip to content

Commit 093d5c0

Browse files
committed
Added PiiBolt and PresidioRedacter implementation
Signed-off-by: Laurent Klock <Laurent.Klock@arhs-cube.com>
1 parent 7a44293 commit 093d5c0

10 files changed

Lines changed: 1106 additions & 0 deletions

File tree

THIRD-PARTY.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -157,6 +157,7 @@ List of third-party dependencies grouped by their license type.
157157
* Jackson dataformat: CBOR (com.fasterxml.jackson.dataformat:jackson-dataformat-cbor:2.18.2 - https://github.com/FasterXML/jackson-dataformats-binary)
158158
* Jackson dataformat: Smile (com.fasterxml.jackson.dataformat:jackson-dataformat-smile:2.18.2 - https://github.com/FasterXML/jackson-dataformats-binary)
159159
* Jackson-dataformat-YAML (com.fasterxml.jackson.dataformat:jackson-dataformat-yaml:2.18.2 - https://github.com/FasterXML/jackson-dataformats-text)
160+
* Jackson datatype: jdk8 (com.fasterxml.jackson.datatype:jackson-datatype-jdk8:2.18.1 - https://github.com/FasterXML/jackson-modules-java8/jackson-datatype-jdk8)
160161
* java-libpst (com.pff:java-libpst:0.9.3 - https://github.com/rjohnsondev/java-libpst)
161162
* JCL 1.2 implemented over SLF4J (org.slf4j:jcl-over-slf4j:2.0.17 - http://www.slf4j.org)
162163
* JetBrains Java Annotations (org.jetbrains:annotations:26.0.2-1 - https://github.com/JetBrains/java-annotations)
Lines changed: 194 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,194 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to you under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.stormcrawler.pii;
19+
20+
import org.apache.commons.lang3.StringUtils;
21+
import org.apache.storm.task.OutputCollector;
22+
import org.apache.storm.task.TopologyContext;
23+
import org.apache.storm.topology.OutputFieldsDeclarer;
24+
import org.apache.storm.topology.base.BaseRichBolt;
25+
import org.apache.storm.tuple.Fields;
26+
import org.apache.storm.tuple.Tuple;
27+
import org.apache.storm.tuple.Values;
28+
import org.apache.stormcrawler.Metadata;
29+
import org.apache.stormcrawler.util.ConfUtils;
30+
import org.apache.stormcrawler.util.InitialisationUtil;
31+
import org.slf4j.LoggerFactory;
32+
33+
import java.nio.charset.StandardCharsets;
34+
import java.util.Map;
35+
36+
/**
37+
* StormCrawler bolt that performs PII redaction on the content of web pages
38+
* before they are passed to the indexing or persistence bolt.<br>
39+
* If enabled, the HTML content will be overwritten with a dummy HTML page (containing just "REDACTED")<br><br>
40+
* <b>pii.redacter.class</b> is the name of the class implementing the PiiInterface interface (e.g. org.apache.stormcrawler.pii.PresidioRedacter)<br>
41+
* <b>pii.language.field</b>, if set, allows to set the name of a Metadata field that contains the language to be passed to the PII redacter instance
42+
*
43+
*/
44+
@SuppressWarnings("serial")
45+
public class PiiBolt extends BaseRichBolt {
46+
47+
private static final org.slf4j.Logger LOG = LoggerFactory.getLogger(PiiBolt.class);
48+
49+
/*
50+
* Name of config field defining the PII Redacter class
51+
* (This class must implement the PiiRedacter interface
52+
*/
53+
public static final String PII_REDACTER_CLASS_PARAM = "pii.redacter.class";
54+
55+
/*
56+
* Name of the field for configurating language detection
57+
*/
58+
public static final String PII_DETECT_LANGUAGE_PARAM = "pii.detect.language";
59+
60+
/*
61+
* Name of the field for defining Metadata field containing language
62+
*/
63+
public static final String PII_LANGUAGE_FIELD = "pii.language.field";
64+
65+
/*
66+
* Name of the field for disabling PII removal
67+
*/
68+
public static final String PII_ENABLE_FIELD = "pii.removal.enable";
69+
70+
private static final String FIELD_URL = "url";
71+
private static final String FIELD_CONTENT = "content";
72+
private static final String FIELD_METADATA = "metadata";
73+
private static final String FIELD_TEXT = "text";
74+
75+
76+
// Default value for language metadata field
77+
private String languageFieldName = "parse.lang";
78+
79+
OutputCollector _collector;
80+
81+
PiiRedacter piiRedacter;
82+
83+
private boolean piiEnabled = false;
84+
85+
public static final String REDACTED_HTML = "<!DOCTYPE html><html lang='en'><head><meta charset='UTF-8'><title>REDACTED</title></head><body>REDACTED</body></html>";
86+
87+
public static final byte[] REDACTED_BYTES = REDACTED_HTML.getBytes(StandardCharsets.UTF_8);
88+
89+
/**
90+
* Returns a Scheduler instance based on the configuration *
91+
*/
92+
public static PiiRedacter getInstance(Map<String, Object> stormConf) {
93+
PiiRedacter redacter;
94+
95+
String className = ConfUtils.getString(stormConf, PII_REDACTER_CLASS_PARAM);
96+
if (className == null || className.isEmpty()) {
97+
throw new RuntimeException("PiiRedacter class name must be defined in the configuration (pii.redacter.class)");
98+
}
99+
100+
LOG.info("Loading PII Redacter class, name={}", className);
101+
try {
102+
redacter = InitialisationUtil.initializeFromQualifiedName(className, PiiRedacter.class);
103+
} catch (Exception e) {
104+
throw new RuntimeException("Can't instantiate " + className, e);
105+
}
106+
107+
LOG.info("Initializing PII Redacter instance");
108+
try {
109+
redacter.init(stormConf);
110+
} catch (Exception e) {
111+
LOG.error("Error while initializing PII Redacter", e);
112+
}
113+
114+
return redacter;
115+
}
116+
117+
public void prepare(Map<String, Object> topoConf, TopologyContext context, OutputCollector collector) {
118+
// Uncomment if extending StatusEmitterBolt
119+
//super.prepare(topoConf, context, collector);
120+
121+
this._collector = collector;
122+
123+
this.piiRedacter = getInstance(topoConf);
124+
LOG.info("Initialized PiiRedacter instance");
125+
126+
// Get language metadata field name
127+
String confLanguageField = ConfUtils.getString(topoConf, "pii.language.field");
128+
if (confLanguageField != null && !confLanguageField.isEmpty()) {
129+
languageFieldName = confLanguageField;
130+
}
131+
LOG.info("PII language field: {}", languageFieldName);
132+
133+
piiEnabled = ConfUtils.getBoolean(topoConf, PII_ENABLE_FIELD, false);
134+
LOG.info("PII disabled: {}", piiEnabled);
135+
136+
}
137+
138+
@Override
139+
public void execute(Tuple input) {
140+
String url = input.getStringByField(FIELD_URL);
141+
Metadata metadata = (Metadata) input.getValueByField(FIELD_METADATA);
142+
String text = input.getStringByField(FIELD_TEXT);
143+
byte[] originalBytes = input.getBinaryByField(FIELD_CONTENT);
144+
145+
LOG.info("Processing URL for PII redaction: {}", url);
146+
147+
if (!piiEnabled) {
148+
emitTuple(input, url, originalBytes, metadata, text);
149+
this._collector.ack(input);
150+
return;
151+
}
152+
153+
if (StringUtils.isBlank(text)) {
154+
LOG.info("No text to process for URL: {}", url);
155+
metadata.addValue("pii.processed", "false");
156+
// Force the binary content to a dummy content
157+
emitTuple(input, url, REDACTED_BYTES, metadata, "");
158+
this._collector.ack(input);
159+
return;
160+
}
161+
162+
try {
163+
String language = metadata.getFirstValue(languageFieldName);
164+
String redacted = (language != null) ?
165+
piiRedacter.redact(text, language) :
166+
piiRedacter.redact(text);
167+
168+
if (redacted == null) {
169+
throw new Exception("PII Redacter returned null");
170+
}
171+
172+
metadata.addValue("pii.processed", "true");
173+
174+
// Force the binary content to a dummy content
175+
emitTuple(input, url, REDACTED_BYTES, metadata, redacted);
176+
} catch (Exception e) {
177+
LOG.error("Error during PII redaction for URL {}: {}", url, e.getMessage());
178+
metadata.addValue("pii.error", e.getMessage());
179+
180+
// How to handle the content in case of error ?
181+
emitTuple(input, url, originalBytes, metadata, text);
182+
}
183+
184+
this._collector.ack(input);
185+
}
186+
187+
private void emitTuple(Tuple input, String url, byte[] content, Metadata metadata, String text) {
188+
this._collector.emit(input, new Values(url, content, metadata, text));
189+
}
190+
191+
public void declareOutputFields(OutputFieldsDeclarer declarer) {
192+
declarer.declare(new Fields(FIELD_URL, FIELD_CONTENT, FIELD_METADATA, FIELD_TEXT));
193+
}
194+
}
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to you under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.stormcrawler.pii;
19+
20+
import java.util.Map;
21+
22+
/**
23+
* An interface for bolts implementing PII redaction
24+
*/
25+
public interface PiiRedacter {
26+
void init(Map<String, Object> topologyConf) throws Exception;
27+
28+
/**
29+
* Redacts PII from the input string using default language settings
30+
* (e.g. no language or a default language configured at initialization)
31+
*
32+
* @param input the input string possibly containing PII
33+
* @return the input string with PII redacted
34+
*/
35+
String redact(String input);
36+
37+
/**
38+
* Redacts PII from the input string using the specified language
39+
* @param input the input string possibly containing PII
40+
* @param language the language to use for PII redaction
41+
* @return
42+
*/
43+
String redact(String input, String language);
44+
}
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to you under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.stormcrawler.pii;
19+
20+
import java.util.Map;
21+
22+
/**
23+
* Mock PII Redacter implementation for testing purposes.
24+
* This class simulates redaction by replacing occurrences of the word"secret"
25+
* with "*****".
26+
*/
27+
28+
public class MockPiiRedacter implements PiiRedacter {
29+
30+
@Override public void init(Map<String, Object> conf) {}
31+
32+
@Override public String redact(String content) {
33+
return redact(content, null);
34+
}
35+
36+
@Override public String redact(String content, String language) {
37+
// simple redaction logic for the test
38+
return content.replaceAll("secret", "*****");
39+
}
40+
}

0 commit comments

Comments
 (0)