Simple Java Interview Spring Batch ETL Problem and Solution

Problem:
========
The goal is to write a parser in Java that parses web server access log file, loads the log to MySQL and checks if a given IP makes more than a certain number of requests for the given duration.

(1) Create a java tool that can parse and load the given log file to MySQL. The delimiter of the log file is pipe (|)

(2) The tool takes "startDate", "duration" and "threshold" as command line arguments. "startDate" is of "yyyy-MM-dd.HH:mm:ss" format, "duration" can take only "hourly", "daily" as inputs and "threshold" can be an integer.

(3) This is how the tool works:

    java "parser.jar" --startDate=2017-01-01.13:00:00 --duration=hourly --threshold=100

The tool will find any IPs that made more than 100 requests starting from 2017-01-01.13:00:00 to 2017-01-01.14:00:00 (one hour) and print them to console AND also load them to another MySQL table with comments on why it's blocked.

java "parser.jar" --startDate=2017-01-01.13:00:00 --duration=daily --threshold=250

The tool will find any IPs that made more than 250 requests starting from 2017-01-01.13:00:00 to 2017-01-02.13:00:00 (24 hours) and print them to console AND also load them to another MySQL table with comments on why it's blocked.

Example input file:
================

2017-01-01 00:00:11.763|192.168.234.82|"GET / HTTP/1.1"|200|"swcd (unknown version) CFNetwork/808.2.16 Darwin/15.6.0"
2017-01-01 00:00:21.164|192.168.234.82|"GET / HTTP/1.1"|200|"swcd (unknown version) CFNetwork/808.2.16 Darwin/15.6.0"
2017-01-01 00:00:23.003|192.168.169.194|"GET / HTTP/1.1"|200|"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/51.0.2704.79 Safari/537.36 Edge/14.14393"
2017-01-01 00:00:40.554|192.168.234.82|"GET / HTTP/1.1"|200|"swcd (unknown version) CFNetwork/808.2.16 Darwin/15.6.0"
..................................
...................................
...................................
...................................
...................................
Full input file: https://drive.google.com/file/d/1pKtnO-2k9pmXw96akmcgYhAlhzuf8Iy2/view?usp=sharing


Input file description:
=================
Date, IP, Request, Status, User Agent (pipe delimited, open the example file in text editor)

Date Format: "yyyy-MM-dd HH:mm:ss.SSS"

The log file assumes 200 as hourly limit, meaning:

When you run your parser against this file with the following parameters

java "parser.jar" --startDate=2017-01-01.15:00:00 --duration=hourly --threshold=200

The output will have 192.168.11.231. If you open the log file, 192.168.11.231 has 200 or more requests between 2017-01-01.15:00:00 and 2017-01-01.15:59:59

Solution:
=======

The solution is simple Spring Batch ETL (Extract, Transform, Load) where we carefully going to implement READER, PROCESSOR & WRITER interfaces:

   @Bean

    public FlatFileItemReader<Consumer> reader() {

        FlatFileItemReader<Consumer> reader = 

           new FlatFileItemReader<Consumer>();

        reader.setResource(new ClassPathResource(Parser.ACCESS_LOG));

        reader.setLineMapper(new DefaultLineMapper<Consumer>() {{

            setLineTokenizer(new DelimitedLineTokenizer("|") {{

                setNames(new String[] { "date", "ip","request","status",
                                        "userAgent" });

            }});

            setFieldSetMapper(new BeanWrapperFieldSetMapper<Consumer>()
           
            {{

                setTargetType(Consumer.class);

            }});

        }});

        return reader;

    }



    @Bean

    public ConsumerRecordProcessor processor() {

        return new ConsumerRecordProcessor();

    }



public class ConsumerRecordProcessor implements ItemProcessor<
Consumer, Consumer> {

    private static final Logger log = LoggerFactory.
            getLogger(ConsumerRecordProcessor.class);

    @Override
    public Consumer process(final Consumer consumer) {

        Consumer transformedConsumer = new Consumer(consumer);
        String ip = consumer.getIp();

        DateTimeFormatter formatter = DateTimeFormatter.ofPattern(
"yyyy-MM-dd HH:mm:ss.SSS");
        LocalDateTime dateTime = LocalDateTime.parse(consumer.
getDateTime().toString(), formatter);

        if(dateTime.toInstant(
                ZoneOffset.ofTotalSeconds(0)).toEpochMilli() >= Parser
.START_TIMESTAMP.toInstant(ZoneOffset.ofTotalSeconds(0)).toEpochMilli()
 && dateTime.toInstant(ZoneOffset.ofTotalSeconds(0)).toEpochMilli() <= Parser
.END_TIMESTAMP.toInstant(ZoneOffset.ofTotalSeconds(0)).toEpochMilli()) {
    if (Parser.blackList.contains(ip)) {
        //already blocked.
        transformedConsumer.setComment(
                Parser.DURATION + " limit exceeded.");
    } else {
        Parser.requestCounter.merge(ip, 1, Integer::sum);
        if (Parser.THRESHOLD <= Parser.requestCounter.get(ip)) {
            log.debug(ip + " reached " + Parser.DURATION + " limit!");
            Parser.blackList.add(ip);//block
            transformedConsumer.setComment(Parser.DURATION
                    + " limit exceeded.");
        }
    }
}
        return transformedConsumer;
    }
}
@Bean public JdbcBatchItemWriter<Consumer> writer() { JdbcBatchItemWriter<Consumer> writer =

               new JdbcBatchItemWriter<Consumer>();

        writer.setItemSqlParameterSourceProvider(
       new BeanPropertyItemSqlParameterSourceProvider<Consumer>());

        writer.setSql("INSERT INTO parser_data (
        date_time, ip, request, status, user_agent, comment) 
        VALUES (:dateTime, :ip, :request, :status,
        :userAgent, :comment)");

        writer.setDataSource(dataSource);

        return writer;

    }



Initial MySql Schema:
=================

DROP TABLE parser_data IF EXISTS;


create table parser_data(

 date_time timestamp,

 ip varchar(100),

 request varchar(100),

 status varchar(100),

 user_agent varchar(1000),

 comment varchar(5000));

Test:
====

 @Test
 public void testParseArgsPlusETL() throws ParseException {
  //java -jar parser.jar com.ef.Parser --startDate=2017-01-01.13:00:00 
--accesslog=access.log --duration=hourly --threshold=100
 Parser.main(new String[]{ --startDate=2017-01-01.13:00:00,
--accesslog=access.log,--duration=hourly,--threshold=100 });
 Assert.assertEquals(hourly,Parser.DURATION);
 Assert.assertEquals(100,Parser.THRESHOLD);
 Assert.assertEquals(Parser.blackList.size(),2);
 }

Full working code Download .

No comments:

Post a Comment