使用多线程代码防止 DOS 攻击

Posted

技术标签:

【中文标题】使用多线程代码防止 DOS 攻击【英文标题】:Prevent DOS attack with multithreaded code 【发布时间】:2021-02-20 06:12:00 【问题描述】:

让我在这里概述一下我的问题:

每台服务器大约有 4000 台服务器和数百万个 URL。我的代码需要点击每个 URL 并将响应代码与 URL 一起写入 HDFS 文件系统。

在这里也添加了一些部分: Check the number of requests sent to a webpage

我在这里使用了一个有 400 个线程的生产者-消费者模型。该代码最近对少数 Web 服务器造成了 DOS 攻击,我很难弄清楚问题出在哪里:

主类:

public void readURLS(final Path inputPath, final Path outputPath) 
    LOG.info("Looking for files to download, queue size: , DOWNLOAD_THREADS: ", queueSize, producerThreads);
    final List<Path> files = HdfsUtils.listDirectory(inputPath, hadoopConf);
    final BlockingQueue<String> queue = new LinkedBlockingQueue<>(queueSize);
    final UrlConsumerWriter consumerWriter =
            new UrlConsumerWriter(queue, outputPath, hadoopConf);

    LOG.info("Starting download of  files from: ''", files.size(), inputPath);
    final ExecutorService writerPool = DownloadUtils.createWriterPool();
    CompletableFuture<Void> producer = downloadFilesToQueue(files, queue)
            .thenRun(consumerWriter::notifyProducersDone);
    CompletableFuture<Void> consumer =
            CompletableFuture.runAsync(consumerWriter, writerPool)// Cancel download workers if write worker fails
                    .whenComplete((result, err) -> 
                        if (err != null) 
                            LOG.error("Consumer Write worker failed!", err);
                            producer.cancel(true);
                        
                    );

    writerPool.shutdown();
    producer.join();
    consumer.join();
    LOG.info("Url Validation Job Complete!!!");


private CompletableFuture<Void> downloadFilesToQueue(
        final List<Path> files,
        final BlockingQueue<String> downloadQueue
) 
    final ExecutorService pool = DownloadUtils.createDownloadPool(producerThreads);

    final List<CompletableFuture<Void>> workers = files.stream()
            .map(file -> new UrlDownloadWorker(clock, file, hadoopConf, downloadQueue,
                    utils, (validatorImpl.emptyTable())))
            .map(worker -> CompletableFuture.runAsync(worker, pool))
            .collect(Collectors.toList());

    pool.shutdown();

    final CompletableFuture<Void> allDownloads = CompletableFuture.allOf(workers.toArray(new CompletableFuture[0]));

    // When one worker fails, cancel all the other immediately
    for (final CompletableFuture<Void> worker : workers) 
        worker.whenComplete((v, err) -> 
            if (err != null) 
                LOG.error("Download worker failed!", err);
                allDownloads.cancel(true);
            
        );
    

    return allDownloads;

生产者等级:

@Override
    public void run() 
        LOG.info("Starting download worker for file: ''", file);
        long numLines = 0;

        try (BufferedReader reader = new BufferedReader(new InputStreamReader(
                file.getFileSystem(hadoopConf).open(file), CHARSET))) 
            String line;
            while ((line = reader.readLine()) != null) 
               // LOG.info("Thread  Reading file: ''",Thread.currentThread().getName(), file);

                if (Thread.interrupted()) 
                    throw new InterruptedException();
                
                StringBuilder builder = new StringBuilder();

                //write into database
                final StatusCode statusCode = utils.validateURL(line);

                if (statusCode != null) 
                        queue.put(builder.append(line)
                                .append(",")
                                .append(statusCode.name()).toString());
                 
                    builder.setLength(0);
                 else 
                    throw new UrlValidationException(
                            "Failed to validate url :'" + line + "'");
                
                numLines++;
            

         catch (IOException e) 
            throw new DownloadException(file, e);
         catch (InterruptedException e) 
            Thread.currentThread().interrupt();
            throw new DownloadException("Interrupted while downloading", file, e);
        
        LOG.info("Download of  lines complete for file: ''", numLines, file);
    

UrlValidationUtils 类:

public final class UrlValidationUtils 
    private static final String WEBSITENOTCHECK = "uncheck.org";
    private final Map<String, StatusCode> blockedHosts = new ConcurrentHashMap<>();
    private static final int MAX_REDIRECT = 4;

    public StatusCode validateURL(String url) throws IOException 
        return validate(url, MAX_REDIRECT);
    

    private StatusCode validate(String url, int maxRedirect) throws IOException 
        URL urlValue = new URL(url);
        HttpURLConnection con;

        if (url.contains(WEBSITENOTCHECK)) 
            blockedHosts.put(urlValue.getHost(), StatusCode.SUCCESS);
        
        //first check if the host is already marked as invalid
//        if (blockedHosts.containsKey(urlValue.getHost())) 
//            return blockedHosts.get(urlValue.getHost());
//        
        StatusCode statusCode;
        con = (HttpURLConnection) urlValue.openConnection();

        try 
            int resCode;
            con.setInstanceFollowRedirects(false);
            con.setConnectTimeout(3000); //set timeout to 3 seconds
            con.connect();
            resCode = con.getResponseCode();

            LOG.info("thread name  connection id  url  ", Thread.currentThread().getName(), con.toString(), url);
            if (resCode == HttpURLConnection.HTTP_OK) 
                statusCode = StatusCode.SUCCESS;
             else if (resCode == HttpURLConnection.HTTP_SEE_OTHER || resCode == HttpURLConnection.HTTP_MOVED_PERM
                    || resCode == HttpURLConnection.HTTP_MOVED_TEMP) 
                String location = con.getHeaderField("Location");
                if (location.startsWith("/")) 
                    location = urlValue.getProtocol() + "://" + urlValue.getHost() + location;
                
                statusCode = validateRedirect(location, maxRedirect - 1, con);

             else 
                blockedHosts.put(urlValue.getHost(), StatusCode.FAIL);
                statusCode = StatusCode.FAIL;
            
         catch (UnknownHostException e) 
            blockedHosts.put(urlValue.getHost(), StatusCode.UNKOWNHOST);
            statusCode = StatusCode.UNKOWNHOST;
         catch (ConnectException e) 
            blockedHosts.put(urlValue.getHost(), StatusCode.CONNECTION_ISSUE);
            statusCode = StatusCode.CONNECTION_ISSUE;
         catch (IOException e) 
            //if an IOException is caught possible reason is SOCKETTIMEOUT
            blockedHosts.put(urlValue.getHost(), StatusCode.SOCKETTIMEOUT);
            statusCode = StatusCode.SOCKETTIMEOUT;
        
        con.disconnect();
        LOG.info("thread name  connection id  url  ", Thread.currentThread().getName(), con.toString(), url);

        return statusCode;
    


    private StatusCode validateRedirect(String location, int redirectCount, HttpURLConnection connection)
            throws IOException 
        if (redirectCount >= 0) 
            connection.disconnect();
            return validate(location, redirectCount);
        
        return StatusCode.FAIL;

    


【问题讨论】:

【参考方案1】:

为了不使服务器过载,我建议在访问一批 url 之前等待几毫秒。例如,在点击 N 个 url 后,您可以等待 20 毫秒,然后点击下一个 N.. 等等。批处理 (N) 的大小取决于您知道服务器在一秒钟内可以处理多少个请求。您是否与他们签订了绩效方面的服务水平协议?

【讨论】:

在这种情况下,这项工作需要几天/几周才能完成,不是吗? 这取决于您的服务器每秒可以处理多少个请求。你有他们的 SLA 吗?如果他们每秒可以处理 1000 个请求,那么使用 1000 个批次。另一个想法:您是否拥有数百万个唯一 URL?您可以减少要点击的 URL 数量吗?

以上是关于使用多线程代码防止 DOS 攻击的主要内容,如果未能解决你的问题,请参考以下文章

java多线程,如何防止脏读数据

如何防止因哈希碰撞引起的DoS攻击

如何防止因哈希碰撞引起的DoS攻击

如何防止因哈希碰撞引起的DoS攻击

Java多线程之代码同步

C语言 用定时器 模拟多线程