Problem:
========
The goal is to write a parser in Java that parses web server access log file, loads the log to MySQL and checks if a given IP makes more than a certain number of requests for the given duration.
(1) Create a java tool that can parse and load the given log file to MySQL. The delimiter of the log file is pipe (|)
(2) The tool takes "startDate", "duration" and "threshold" as command line arguments. "startDate" is of "yyyy-MM-dd.HH:mm:ss" format, "duration" can take only "hourly", "daily" as inputs and "threshold" can be an integer.
(3) This is how the tool works:
java "parser.jar" --startDate=2017-01-01.13:00:00 --duration=hourly --threshold=100
The tool will find any IPs that made more than 100 requests starting from 2017-01-01.13:00:00 to 2017-01-01.14:00:00 (one hour) and print them to console AND also load them to another MySQL table with comments on why it's blocked.
java "parser.jar" --startDate=2017-01-01.13:00:00 --duration=daily --threshold=250
The tool will find any IPs that made more than 250 requests starting from 2017-01-01.13:00:00 to 2017-01-02.13:00:00 (24 hours) and print them to console AND also load them to another MySQL table with comments on why it's blocked.
Example input file:
================
2017-01-01 00:00:11.763|192.168.234.82|"GET / HTTP/1.1"|200|"swcd (unknown version) CFNetwork/808.2.16 Darwin/15.6.0"
2017-01-01 00:00:21.164|192.168.234.82|"GET / HTTP/1.1"|200|"swcd (unknown version) CFNetwork/808.2.16 Darwin/15.6.0"
2017-01-01 00:00:23.003|192.168.169.194|"GET / HTTP/1.1"|200|"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/51.0.2704.79 Safari/537.36 Edge/14.14393"
2017-01-01 00:00:40.554|192.168.234.82|"GET / HTTP/1.1"|200|"swcd (unknown version) CFNetwork/808.2.16 Darwin/15.6.0"
..................................
...................................
...................................
...................................
...................................
Full input file:
https://drive.google.com/file/d/1pKtnO-2k9pmXw96akmcgYhAlhzuf8Iy2/view?usp=sharing
Input file description:
=================
Date, IP, Request, Status, User Agent (pipe delimited, open the example file in text editor)
Date Format: "yyyy-MM-dd HH:mm:ss.SSS"
The log file assumes 200 as hourly limit, meaning:
When you run your parser against this file with the following parameters
java "parser.jar" --startDate=2017-01-01.15:00:00 --duration=hourly --threshold=200
The output will have 192.168.11.231. If you open the log file, 192.168.11.231 has 200 or more requests between 2017-01-01.15:00:00 and 2017-01-01.15:59:59
Solution:
=======
The solution is simple Spring Batch ETL (Extract, Transform, Load) where we carefully going to implement
READER,
PROCESSOR &
WRITER interfaces:
@Bean
public FlatFileItemReader<Consumer> reader() {
FlatFileItemReader<Consumer> reader =
new FlatFileItemReader<Consumer>();
reader.setResource(new ClassPathResource(Parser.ACCESS_LOG));
reader.setLineMapper(new DefaultLineMapper<Consumer>() {{
setLineTokenizer(new DelimitedLineTokenizer("|") {{
setNames(new String[] { "date", "ip","request","status",
"userAgent" });
}});
setFieldSetMapper(new BeanWrapperFieldSetMapper<Consumer>()
{{
setTargetType(Consumer.class);
}});
}});
return reader;
}
@Bean
public ConsumerRecordProcessor processor() {
return new ConsumerRecordProcessor();
}
public class ConsumerRecordProcessor implements ItemProcessor<
Consumer, Consumer> {
private static final Logger log = LoggerFactory.
getLogger(ConsumerRecordProcessor.class);
@Override
public Consumer process(final Consumer consumer) {
Consumer transformedConsumer = new Consumer(consumer);
String ip = consumer.getIp();
DateTimeFormatter formatter = DateTimeFormatter.ofPattern(
"yyyy-MM-dd HH:mm:ss.SSS");
LocalDateTime dateTime = LocalDateTime.parse(consumer.
getDateTime().toString(), formatter);
if(dateTime.toInstant(
ZoneOffset.ofTotalSeconds(0)).toEpochMilli() >= Parser
.START_TIMESTAMP.toInstant(ZoneOffset.ofTotalSeconds(0)).toEpochMilli()
&& dateTime.toInstant(ZoneOffset.ofTotalSeconds(0)).toEpochMilli() <= Parser
.END_TIMESTAMP.toInstant(ZoneOffset.ofTotalSeconds(0)).toEpochMilli()) {
if (Parser.blackList.contains(ip)) {
//already blocked.
transformedConsumer.setComment(
Parser.DURATION + " limit exceeded.");
} else {
Parser.requestCounter.merge(ip, 1, Integer::sum);
if (Parser.THRESHOLD <= Parser.requestCounter.get(ip)) {
log.debug(ip + " reached " + Parser.DURATION + " limit!");
Parser.blackList.add(ip);//block
transformedConsumer.setComment(Parser.DURATION
+ " limit exceeded.");
}
}
}
return transformedConsumer;
}
}
@Bean
public JdbcBatchItemWriter<Consumer> writer() {
JdbcBatchItemWriter<Consumer> writer =
new JdbcBatchItemWriter<Consumer>();
writer.setItemSqlParameterSourceProvider(
new BeanPropertyItemSqlParameterSourceProvider<Consumer>());
writer.setSql("INSERT INTO parser_data (
date_time, ip, request, status, user_agent, comment)
VALUES (:dateTime, :ip, :request, :status,
:userAgent, :comment)");
writer.setDataSource(dataSource);
return writer;
}
Initial MySql Schema:
=================
DROP TABLE parser_data IF EXISTS;
create table parser_data(
date_time timestamp,
ip varchar(100),
request varchar(100),
status varchar(100),
user_agent varchar(1000),
comment varchar(5000));
Test:
====
@Test
public void testParseArgsPlusETL() throws ParseException {
//java -jar parser.jar com.ef.Parser --startDate=2017-01-01.13:00:00
--accesslog=access.log --duration=hourly --threshold=100
Parser.main(new String[]{ --startDate=2017-01-01.13:00:00,
--accesslog=access.log,--duration=hourly,--threshold=100 });
Assert.assertEquals(hourly,Parser.DURATION);
Assert.assertEquals(100,Parser.THRESHOLD);
Assert.assertEquals(Parser.blackList.size(),2);
}
Full working code
Download .