========
The goal is to write a parser in Java that parses web server access log file, loads the log to MySQL and checks if a given IP makes more than a certain number of requests for the given duration.
(1) Create a java tool that can parse and load the given log file to MySQL. The delimiter of the log file is pipe (|)
(2) The tool takes "startDate", "duration" and "threshold" as command line arguments. "startDate" is of "yyyy-MM-dd.HH:mm:ss" format, "duration" can take only "hourly", "daily" as inputs and "threshold" can be an integer.
(3) This is how the tool works:
java "parser.jar" --startDate=2017-01-01.13:00:00 --duration=hourly --threshold=100
The tool will find any IPs that made more than 100 requests starting from 2017-01-01.13:00:00 to 2017-01-01.14:00:00 (one hour) and print them to console AND also load them to another MySQL table with comments on why it's blocked.
java "parser.jar" --startDate=2017-01-01.13:00:00 --duration=daily --threshold=250
The tool will find any IPs that made more than 250 requests starting from 2017-01-01.13:00:00 to 2017-01-02.13:00:00 (24 hours) and print them to console AND also load them to another MySQL table with comments on why it's blocked.
Example input file:
================
2017-01-01 00:00:11.763|192.168.234.82|"GET / HTTP/1.1"|200|"swcd (unknown version) CFNetwork/808.2.16 Darwin/15.6.0"
2017-01-01 00:00:21.164|192.168.234.82|"GET / HTTP/1.1"|200|"swcd (unknown version) CFNetwork/808.2.16 Darwin/15.6.0"
2017-01-01 00:00:23.003|192.168.169.194|"GET / HTTP/1.1"|200|"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/51.0.2704.79 Safari/537.36 Edge/14.14393"
2017-01-01 00:00:40.554|192.168.234.82|"GET / HTTP/1.1"|200|"swcd (unknown version) CFNetwork/808.2.16 Darwin/15.6.0"
..................................
...................................
...................................
...................................
...................................
Full input file: https://drive.google.com/file/d/1pKtnO-2k9pmXw96akmcgYhAlhzuf8Iy2/view?usp=sharing
Input file description:
=================
Date, IP, Request, Status, User Agent (pipe delimited, open the example file in text editor)
Date Format: "yyyy-MM-dd HH:mm:ss.SSS"
The log file assumes 200 as hourly limit, meaning:
When you run your parser against this file with the following parameters
java "parser.jar" --startDate=2017-01-01.15:00:00 --duration=hourly --threshold=200
The output will have 192.168.11.231. If you open the log file, 192.168.11.231 has 200 or more requests between 2017-01-01.15:00:00 and 2017-01-01.15:59:59
Solution:
=======
The solution is simple Spring Batch ETL (Extract, Transform, Load) where we carefully going to implement READER, PROCESSOR & WRITER interfaces:
@Bean public FlatFileItemReader<Consumer> reader() { FlatFileItemReader<Consumer> reader =
new FlatFileItemReader<Consumer>(); reader.setResource(new ClassPathResource(Parser.ACCESS_LOG)); reader.setLineMapper(new DefaultLineMapper<Consumer>() {{ setLineTokenizer(new DelimitedLineTokenizer("|") {{ setNames(new String[] { "date", "ip","request","status",
"userAgent" }); }}); setFieldSetMapper(new BeanWrapperFieldSetMapper<Consumer>()
{{ setTargetType(Consumer.class); }}); }}); return reader; } @Bean public ConsumerRecordProcessor processor() { return new ConsumerRecordProcessor(); }
public class ConsumerRecordProcessor implements ItemProcessor<Consumer, Consumer> { private static final Logger log = LoggerFactory. getLogger(ConsumerRecordProcessor.class); @Override public Consumer process(final Consumer consumer) { Consumer transformedConsumer = new Consumer(consumer); String ip = consumer.getIp(); DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS"); LocalDateTime dateTime = LocalDateTime.parse(consumer.getDateTime().toString(), formatter); if(dateTime.toInstant( ZoneOffset.ofTotalSeconds(0)).toEpochMilli() >= Parser.START_TIMESTAMP.toInstant(ZoneOffset.ofTotalSeconds(0)).toEpochMilli()&& dateTime.toInstant(ZoneOffset.ofTotalSeconds(0)).toEpochMilli() <= Parser.END_TIMESTAMP.toInstant(ZoneOffset.ofTotalSeconds(0)).toEpochMilli()) { if (Parser.blackList.contains(ip)) { //already blocked. transformedConsumer.setComment( Parser.DURATION + " limit exceeded."); } else { Parser.requestCounter.merge(ip, 1, Integer::sum); if (Parser.THRESHOLD <= Parser.requestCounter.get(ip)) { log.debug(ip + " reached " + Parser.DURATION + " limit!"); Parser.blackList.add(ip);//block transformedConsumer.setComment(Parser.DURATION + " limit exceeded."); } } } return transformedConsumer; } }@Bean public JdbcBatchItemWriter<Consumer> writer() { JdbcBatchItemWriter<Consumer> writer =
new JdbcBatchItemWriter<Consumer>(); writer.setItemSqlParameterSourceProvider(
new BeanPropertyItemSqlParameterSourceProvider<Consumer>()); writer.setSql("INSERT INTO parser_data (
date_time, ip, request, status, user_agent, comment)
VALUES (:dateTime, :ip, :request, :status,
:userAgent, :comment)");
writer.setDataSource(dataSource); return writer; }
Initial MySql Schema:
=================
DROP TABLE parser_data IF EXISTS; create table parser_data( date_time timestamp, ip varchar(100), request varchar(100), status varchar(100), user_agent varchar(1000), comment varchar(5000));
Test:
====
@Test public void testParseArgsPlusETL() throws ParseException { //java -jar parser.jar com.ef.Parser --startDate=2017-01-01.13:00:00
--accesslog=access.log --duration=hourly --threshold=100 Parser.main(new String[]{ --startDate=2017-01-01.13:00:00,
--accesslog=access.log,--duration=hourly,--threshold=100 });
Assert.assertEquals(hourly,Parser.DURATION); Assert.assertEquals(100,Parser.THRESHOLD); Assert.assertEquals(Parser.blackList.size(),2); }
Full working code Download .
No comments:
Post a Comment