Java微博爬虫-每日百万数据
Posted post-90sdachenchen
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Java微博爬虫-每日百万数据相关的知识,希望对你有一定的参考价值。
没写过爬虫,赶鸭子上架,公司里有个老代码,我就拿来参考,边看边写3周后,把整个代码大换血,实现了单日单程序百万的爬取量。
使用springboot + JDK1.8 + mysql + redis。
主要有 关键词爬取、redis队列、多线程爬取程序 三部分。
一、关键词的爬取:
我用的是在微博搜索栏输入关键词的方法爬取数据,所以关键词的热度、新鲜度很重要。
我在百度、搜狗、微博这几个网站的热搜榜每隔40秒抓取一次实时的热词。
第一步,找热词质量高的网站。
# 百度热搜网址 baidu.hotnews = http://top.baidu.com/buzz?b=1&fr=topnews baidu.topcategory = http://top.baidu.com/buzz?b=2&c=12&fr=topcategory_c12 baidu.oneday.hotbuzz = http://top.baidu.com/buzz?b=341&fr=topbuzz_b1 baidu.oneday.lifehot = http://top.baidu.com/buzz?b=342&c=513&fr=topbuzz_b344_c513 # 微博热搜网址 weibo.realtimehot = https://s.weibo.com/top/summary?cate=realtimehot weibo.realtime = https://weibo.com/a/hot/realtime # 搜狗热搜网址 sogou.hotTop1 = http://top.sogou.com/hot/shishi_1.html sogou.hotTop2 = http://top.sogou.com/hot/shishi_2.html sogou.hotTop3 = http://top.sogou.com/hot/shishi_3.html # 360热搜网址 360.hotlist.star = https://trends.so.com/top/list?cate1=%E4%BA%BA%E7%89%A9&cate2=%E6%98%8E%E6%98%9F&page=1&size=100 360.hotlist.netstar = https://trends.so.com/top/list?cate1=%E4%BA%BA%E7%89%A9&cate2=%E7%BD%91%E7%BA%A2&page=1&size=100 360.hotlist.famous = https://trends.so.com/top/list?cate1=%E4%BA%BA%E7%89%A9&cate2=%E5%90%8D%E5%AE%B6&page=1&size=100 360.hotlist.website = https://trends.so.com/top/list?cate1=%E7%BD%91%E7%AB%99&cate2=&page=1&size=100 360.hotlist.ip = https://trends.so.com/top/list?cate1=IP&cate2=&page=1&size=100 360.hotlist.ai = https://trends.so.com/top/list?cate1=%E6%99%BA%E8%83%BD%E7%BB%88%E7%AB%AF&cate2=%E6%89%8B%E6%9C%BA&page=10&size=100 360.hotlist.car = https://trends.so.com/top/list?cate1=%E6%B1%BD%E8%BD%A6&cate2=&page=11&size=100 360.hotlist.live = https://trends.so.com/top/list?cate1=%E7%9B%B4%E6%92%AD&cate2=%E4%B8%BB%E6%92%AD&page=8&size=80 360.hotlist.livesite = https://trends.so.com/top/list?cate1=%E7%9B%B4%E6%92%AD&cate2=%E7%9B%B4%E6%92%AD%E5%B9%B3%E5%8F%B0&page=6&size=60 360.hotlist.drink = https://trends.so.com/top/list?cate1=%E9%85%92%E7%B1%BB&cate2=&page=1&size=40 360.hotlist.carton = https://trends.so.com/top/list?cate1=%E5%A8%B1%E4%B9%90&cate2=%E5%8A%A8%E6%BC%AB&page=1&size=100 360.hotlist.sports = https://trends.so.com/top/list?cate1=%E5%A8%B1%E4%B9%90&cate2=%E4%BD%93%E8%82%B2&page=1&size=100 360.hotlist.music = https://trends.so.com/top/list?cate1=%E5%A8%B1%E4%B9%90&cate2=%E9%9F%B3%E4%B9%90&page=1&size=100 360.hotlist.movie = https://trends.so.com/top/list?cate1=%E5%A8%B1%E4%B9%90&cate2=%E7%94%B5%E5%BD%B1&page=8&size=100 360.hotlist.tv = https://trends.so.com/top/list?cate1=%E9%85%92%E7%B1%BB&cate2=&page=6&size=100 360.hotlist.fun = https://trends.so.com/top/list?cate1=%E5%A8%B1%E4%B9%90&cate2=%E7%94%B5%E8%A7%86%E5%89%A7&page=6&size=100 360.hotlist.novel = https://trends.so.com/top/list?cate1=%E5%A8%B1%E4%B9%90&cate2=%E5%B0%8F%E8%AF%B4&page=1&size=100 360.hotlist.game = https://trends.so.com/top/list?cate1=%E5%A8%B1%E4%B9%90&cate2=%E6%B8%B8%E6%88%8F&page=6&size=100 360.hotlist.cosmetics = https://trends.so.com/top/list?cate1=%E5%8C%96%E5%A6%86%E5%93%81&cate2=&page=4&size=40 360.hotlist.luxury = https://trends.so.com/top/list?cate1=%E5%A5%A2%E4%BE%88%E5%93%81&cate2=&page=3&size=30
(附上爬取关键词网页,这些热词的质量极高)
第二步,热词爬取:
以微博热搜榜的爬取为例。
String str= "https://s.weibo.com/top/summary?cate=realtimehot";//网页链接 HotListSearch hotListSearch = new HotListSearch();//创建爬取热词对象 List<Keywords> keywords = hotListSearch.queryWeibo(str);//使用爬取微博方法 int i =1; for(Keywords key:keywords) System.out.println("No."+i+"==========="+key.toString()); i++; //将爬取的结果封装成java对象
HotListSearch.class
public class HotListSearch public HotListSearch() this(null); private HttpProxy proxy; public HotListSearch(HttpProxy proxy) this.proxy = proxy; /* * 微博 * */ public List<Keywords> queryWeibo(String url) Connect connect = new Connect(); String html = connect.get(url, proxy); String str = "div[class=data] tbody tr";//jsoup需抓取的css标识 List<Keywords> keywords = parseWeibo(html,str);//解析html为需要的集合 return keywords; /* * 解析 HTML变集合 * */ private List<Keywords> parseWeibo(String html,String str) if (html == null || html.isEmpty()) return null; Document doc = Jsoup.parse(html);//解析html为java对象 Elements list = doc.select(str);//根据css标识把Document分为集合 if (list == null || list.isEmpty()) return null; List<Keywords> keywords = new ArrayList<>(); for (int i = 0, len = list.size(); i < len; i++) try HotSearchElementParser parser = new HotSearchElementParser();//解析list中每一个元素的工具,变为java对象 Keywords key = parser.parseSearchWeibo(list.get(i));//将元素变为关键词对象 if(key!=null) keywords.add(key); catch (Exception e) e.getMessage(); return keywords;
HotSearchElementParser.class
public class HotSearchElementParser public Keywords parseSearchWeibo(Element item) throws ParseException Keywords keywords=parseSearch(); String querystr=item.select("td[class=td-02] a").text();//获取热词 if(querystr==null||querystr.isEmpty()) return null; keywords.setQuerystr(querystr); return keywords;
Keywords.class
/** * 下载关键词 * */ public class Keywords implements Serializable private static final long serialVersionUID = 1L; private int id; private String querystr; private String region; // keywords region private String nodup; // keywords nodup private int status; // 状态,1:正在下载、2:暂停下载 private long next; // 下一次加载 private String growth; // 最近 5 次下载数量 private long lastDownloadTime; // 最后下载时间 private int total; // total downloads private int amount; // amount of downloads private String updateDate; public int getId() return id; public void setId(int id) this.id = id; public String getQuerystr() return querystr; public void setQuerystr(String querystr) this.querystr = querystr; public String getRegion() return region; public void setRegion(String region) this.region = region; public String getNodup() return nodup; public void setNodup(String nodup) this.nodup = nodup; public int getStatus() return status; public void setStatus(int status) this.status = status; public long getNext() return next; public void setNext(long next) this.next = next; public String getGrowth() return growth; public void setGrowth(String growth) this.growth = growth; public long getLastDownloadTime() return lastDownloadTime; public void setLastDownloadTime(long lastDownloadTime) this.lastDownloadTime = lastDownloadTime; public int getTotal() return total; public void setTotal(int total) this.total = total; public int getAmount() return amount; public void setAmount(int amount) this.amount = amount; public String getUpdateDate() return updateDate; public void setUpdateDate(String updateDate) this.updateDate = updateDate; @Override public String toString() return "Keywords" + "id=" + id + ", querystr=‘" + querystr + ‘\‘‘ + ", region=‘" + region + ‘\‘‘ + ", nodup=‘" + nodup + ‘\‘‘ + ", status=" + status + ", next=" + next + ", growth=‘" + growth + ‘\‘‘ + ", lastDownloadTime=" + lastDownloadTime + ", total=" + total + ", amount=" + amount + ", updateDate=" + updateDate + ‘‘;
Connect.class
package com.cnxunao.common.utils; import com.cnxunao.weibospider.entities.HttpProxy; import org.apache.commons.lang3.StringUtils; import org.apache.http.HttpHost; import org.apache.http.auth.AuthScope; import org.apache.http.auth.UsernamePasswordCredentials; import org.apache.http.client.CredentialsProvider; import org.apache.http.client.config.RequestConfig; import org.apache.http.client.config.RequestConfig.Builder; import org.apache.http.client.methods.CloseableHttpResponse; import org.apache.http.client.methods.HttpGet; import org.apache.http.client.protocol.HttpClientContext; import org.apache.http.impl.client.BasicCredentialsProvider; import org.apache.http.impl.client.CloseableHttpClient; import org.apache.http.impl.client.HttpClients; import org.apache.http.protocol.HttpContext; import org.apache.http.util.EntityUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.InputStream; import java.net.InetSocketAddress; import java.net.Proxy; import java.net.URL; import java.net.URLConnection; import java.nio.charset.StandardCharsets; import java.util.Random; public class Connect private static Logger logger = LoggerFactory.getLogger(Connect.class); public String get(String url) return get(url, null); public String get(String url, HttpProxy proxy) try (CloseableHttpClient httpclient = HttpClients.custom().setUserAgent(this.userAgent).build()) HttpGet request = new HttpGet(url.trim()); HttpContext context = createContext(proxy); try (CloseableHttpResponse response = httpclient.execute(request, context)) return EntityUtils.toString(response.getEntity(), charset); catch (Exception e) e.printStackTrace(); throw new IllegalArgumentException("timeout"); public String getKeyword(String targetUrl, HttpProxy proxy) String proxyHost = proxy.getHost(); int proxyPort = proxy.getPort(); Proxy.Type proxyType = Proxy.Type.SOCKS; try InetSocketAddress addr = new InetSocketAddress(proxyHost, proxyPort); Proxy Httpproxy = new Proxy(proxyType, addr); URL url = new URL(targetUrl); URLConnection conn = url.openConnection(Httpproxy); InputStream in = conn.getInputStream(); return IO2String(in); catch (Exception e) e.printStackTrace(); throw new IllegalArgumentException("timeout"); public String get(String url, HttpProxy proxy, int reconnectionTimes) if (reconnectionTimes < 2) return get(url, proxy); if (reconnectionTimes > 5) throw new IllegalArgumentException("Too many reconnection"); String html = null; for (int i = 0; i < reconnectionTimes; i++) try html = get(url, proxy); break; catch (Exception e) logger.error("reconnection: ", url); try Thread.sleep(1_500L); catch (InterruptedException e1) if (html == null) throw new IllegalArgumentException("timeout"); return html; private HttpContext createContext(HttpProxy proxy) HttpClientContext context = HttpClientContext.create(); Builder builder = RequestConfig.custom().setConnectTimeout(timeout).setSocketTimeout(timeout); if (proxy != null && StringUtils.isNotEmpty(proxy.getHost())) builder.setProxy(new HttpHost(proxy.getHost(), proxy.getPort())); if (StringUtils.isNotEmpty(proxy.getUsername()) && StringUtils.isNotEmpty(proxy.getPassword())) CredentialsProvider credsProvider = new BasicCredentialsProvider(); credsProvider.setCredentials(new AuthScope(proxy.getHost(), proxy.getPort()), new UsernamePasswordCredentials(proxy.getUsername(), proxy.getPassword())); context.setCredentialsProvider(credsProvider); RequestConfig config = builder.build(); context.setRequestConfig(config); return context; private static Random random = new Random(); // private String userAgent = "Opera/9.27 (Windows NT 5.2; U; zh-cn)"; private String userAgent = userAgents[random.nextInt(14)]; public void setUserAgent(String userAgent) this.userAgent = userAgent; private String charset = "UTF-8"; public void setCharset(String charset) this.charset = charset; private int timeout = 15_000; public void setTimeout(int timeout) this.timeout = timeout; public static String IO2String(InputStream inStream) throws IOException ByteArrayOutputStream result = new ByteArrayOutputStream(); byte[] buffer = new byte[1024]; int len; while ((len = inStream.read(buffer)) != -1) result.write(buffer, 0, len); String str = result.toString(StandardCharsets.UTF_8.name()); return str; //user_Agent池 private static String[] userAgents = "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/44.0.2403.89 Safari/537.36", "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/535.1 (KHTML, like Gecko) Chrome/14.0.835.163 Safari/535.1", "Mozilla/5.0 (Windows NT 6.1; WOW64; rv:6.0) Gecko/20100101 Firefox/6.0", "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/534.50 (KHTML, like Gecko) Version/5.1 Safari/534.50", "Opera/9.80 (Windows NT 6.1; U; zh-cn) Presto/2.9.168 Version/11.50", "Mozilla/5.0 (compatible; MSIE 9.0; Windows NT 6.1; Win64; x64; Trident/5.0; .NET CLR 2.0.50727; SLCC2; .NET CLR 3.5.30729; .NET CLR 3.0.30729; Media Center PC 6.0; InfoPath.3; .NET4.0C; Tablet PC 2.0; .NET4.0E)", "Mozilla/4.0 (compatible; MSIE 8.0; Windows NT 6.1; WOW64; Trident/4.0; SLCC2; .NET CLR 2.0.50727; .NET CLR 3.5.30729; .NET CLR 3.0.30729; Media Center PC 6.0; .NET4.0C; InfoPath.3)", "Mozilla/4.0 (compatible; MSIE 8.0; Windows NT 5.1; Trident/4.0; GTB7.0)", "Mozilla/4.0 (compatible; MSIE 7.0; Windows NT 5.1)", "Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1; SV1)", "Mozilla/5.0 (Windows; U; Windows NT 6.1; ) AppleWebKit/534.12 (KHTML, like Gecko) Maxthon/3.0 Safari/534.12", "Mozilla/5.0 (Windows; U; Windows NT 6.1; en-US) AppleWebKit/534.3 (KHTML, like Gecko) Chrome/6.0.472.33 Safari/534.3 SE 2.X MetaSr 1.0", "Mozilla/5.0 (Windows NT 6.1) AppleWebKit/535.1 (KHTML, like Gecko) Chrome/13.0.782.41 Safari/535.1 QQBrowser/6.9.11079.201", "Mozilla/5.0 (compatible; MSIE 9.0; Windows NT 6.1; WOW64; Trident/5.0)" ;
HttpResponse.class
package com.cnxunao.weibospider.utils; import java.util.Vector; public class HttpResponse String urlString; int defaultPort; String file; String host; String path; int port; String protocol; String query; String ref; String userInfo; String contentEncoding; int contentLength; String content; String contentType; int code; String message; String method; int connectTimeout; int readTimeout; Vector<String> contentCollection; public String getContent() return content; public String getContentType() return contentType; public int getCode() return code; public String getMessage() return message; public Vector<String> getContentCollection() return contentCollection; public String getContentEncoding() return contentEncoding; public String getMethod() return method; public int getConnectTimeout() return connectTimeout; public int getReadTimeout() return readTimeout; public String getUrlString() return urlString; public int getDefaultPort() return defaultPort; public String getFile() return file; public String getHost() return host; public String getPath() return path; public int getPort() return port; public String getProtocol() return protocol; public String getQuery() return query; public String getRef() return ref; public String getUserInfo() return userInfo;
测试通过后,使用@Scheduled来写一个线程,把爬取到的关键词定时加入redis队列
WeiboHotThread.class
/* * 爬取 微博实时榜 * */ @Component @EnableScheduling public class WeiboHotThread protected Logger logger = LoggerFactory.getLogger(getClass()); @Autowired RedisTempService redisService; @Autowired private HotListSearch hotListSearch; @Scheduled(initialDelay = 80_000,fixedRate= 120_000) public void run() System.out.println("开始执行微博"); if(redisService.count("KeywordsQueue")<=600) List<Keywords> list=hotListSearch.queryWeibo("https://s.weibo.com/top/summary?cate=realtimehot"); Keywords[] array=new Keywords[list.size()]; for(int i=0;i<list.size();i++) Keywords keywords=list.get(i); array[i]=keywords; redisService.lpush("KeywordsQueue",array);//装入redis队列 logger.info("Successful download keywords,add to redis: "+array.length);
RedisTempService.class(redis具体使用操作方法,这里就不做讲解了,直接附上一个方法)
//队列中插入元素 public void lpush(String key, Serializable... keywords) redisTemplate.opsForList().leftPushAll(key,keywords);
第三步,爬取微博关键词信息
爬取思路大概就是,写定时线程获取代理服务器和关键词,将关键词生成网页链接,使用代理请求该链接,获取返回值,将返回值处理成java对象后写成xml,再写一个线程定时将许多xml文件打成jar包,之后jar包任君处置。
下面贴出部分代理用于参考。
AbstractDownload.class
public abstract class AbstractDownload<T> protected Logger logger = LoggerFactory.getLogger(getClass()); protected void exec(boolean multi, int multinum, int multiple, ThreadPoolExecutor executor) if (multi) multi(multinum, multiple, executor); else single(); private void multi(int multinum, int multiple, ThreadPoolExecutor executor) if (multinum == 1) single(); return; List<HttpProxy> proxys = getValidProxy(multinum); List<T> entities = getValidEntity(proxys.size() * multiple); int total = entities.size(); int len = total / multiple + (total % multiple == 0 ? 0 : 1); CompletableFuture<?>[] cfs = IntStream.range(0, len).mapToObj(i -> HttpProxy proxy = proxys.get(i); CopyOnWriteArrayList<T> list =new CopyOnWriteArrayList(entities.subList(i * multiple, i == len - 1 ? total : (i + 1) * multiple).toArray()); return CompletableFuture.runAsync(() -> download(proxy, list); , executor); ).toArray(CompletableFuture[]::new); CompletableFuture.allOf(cfs).join(); private void single() HttpProxy proxy = getValidProxy(1).get(0); T entity = getValidEntity(1).get(0); download(proxy, entity); private void download(HttpProxy proxy, CopyOnWriteArrayList<T> entities) for (int i = 0, len = entities.size(); i < len; i++) try download(proxy, entities.get(i)); entities.remove(i); catch (Exception e) logger.error(e.getMessage()); finally // 最后一次下载之后将更换代理,不用暂停 if (i < len - 1) try Thread.sleep(getPauseTime()); catch (InterruptedException e) e.printStackTrace(); @Autowired RedisTempService redisService; public abstract void download(HttpProxy proxy, T entity); // 校验下载实体 protected abstract void validate(T entity); // 查询 protected abstract List<Weibo> query(HttpProxy proxy, T entity); // 下载完成,更新下一次的下载时间 protected abstract void updateEntity(T entity, List<Weibo> weibos); // 日志保存 protected abstract void saveDownloadLog(T entity, HttpProxy proxy, long consumeTime, List<Weibo> weibos); /* * 下载微博写入临时文件 */ protected void storeWeibos(List<Weibo> weibos) if (weibos == null || weibos.isEmpty()) return; try WeiboUtils.writeToTempXml(weibos); catch (IOException e) logger.error("write temp xml error.", e); protected abstract List<HttpProxy> getValidProxy(int size); protected abstract List<T> getValidEntity(int size); // 两次下载之间的间隔时间 protected int getPauseTime() return 1000 * RandomUtils.nextInt(3, 5); protected static class DefaultThreadFactory implements ThreadFactory private final AtomicInteger threadNumber = new AtomicInteger(1); private final String namePrefix; DefaultThreadFactory(String namePrefix) this.namePrefix = namePrefix; @Override public Thread newThread(Runnable r) Thread t = new Thread(r, namePrefix + threadNumber.getAndIncrement()); if (t.isDaemon()) t.setDaemon(false); if (t.getPriority() != Thread.NORM_PRIORITY) t.setPriority(Thread.NORM_PRIORITY); return t;
DownloadKeywordThread.class
@Component @EnableAsync @EnableScheduling public class DownloadKeywordThread extends AbstractDownload<Keywords> @Value("$download.keyword.use") private boolean use; @Value("$download.keyword.multi") private boolean multi; @Value("$download.keyword.multinum") private int multinum; @Value("$download.keyword.multiple") private int multiple; @Autowired HttpProxyService proxyService; private ThreadPoolExecutor executor; public DownloadKeywordThread() int nThreads = Runtime.getRuntime().availableProcessors()*3; executor = new ThreadPoolExecutor(nThreads, nThreads, 0, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(100), new DefaultThreadFactory("download.keyword-")); @Async @Scheduled(initialDelay = 10_000, fixedRate = 1_000) public void run() throws InterruptedException System.out.println("开始执行关键词"); if (use) try exec(multi, multinum, multiple, executor); catch (Exception e) logger.info(e.getMessage()); @Override protected void validate(Keywords entity) if (StringUtils.isEmpty(entity.getQuerystr())) entity.setStatus(Constants.STATUS_SUSPEND); kwService.saveOrUpdate(entity); throw new IllegalArgumentException("Keywords not null"); @Override protected List<Weibo> query(HttpProxy proxy, Keywords kw) List<Weibo> weibos = null; for (int i = 0; i < 3; i++) try KeywordsSearch download = new KeywordsSearch(proxy); weibos = download.query(kw); proxy.setSuccess(proxy.getSuccess() + 1); logger.info("Successful download, weibos: , keywords: , proxy: ", weibos.size(), kw.getQuerystr(), proxy != null ? proxy.getHost() : ""); break; catch(NullPointerException e1) // 动态代理被限制 logger.error("proxyIp is limit by weibo", proxy.getHost()); proxy.setFailure(proxy.getFailure()+1); break; catch (Exception e) // 连接动态代理失败 if ("timeout".equals(e.getMessage())) logger.error("can not connect to proxyIp: ", proxy.getHost()); proxy.setFailure(proxy.getFailure()+1); break; // 微博没有相关结果 if ("noresult".equals(e.getMessage())) logger.error("Keywords not found relevant results", kw.getQuerystr()); break; // 代理需要人工输入验证码 if ("verification".equals(e.getMessage())) proxy.setFailure(proxy.getFailure() + 1); proxy.setStatus(Constants.STATUS_SUSPEND); logger.error("Proxy : requires verification code", proxy.getHost(), proxy.getPort()); break; finally queryFinally(proxy); return weibos; @Autowired DownloadLogService logService; @Override protected void saveDownloadLog(Keywords entity, HttpProxy proxy, long consumeTime, List<Weibo> weibos) logService.storeLog(entity.getQuerystr(), proxy, Constants.TYPE_KEYWORDS, consumeTime, weibos); /* * 有效代理 */ @Override protected List<HttpProxy> getValidProxy(int size) List<HttpProxy> list = StaticService.getVailid().stream() // 最近至少6秒内未使用 .filter(proxy -> proxy.getLastUseTime() + 6_000 < System.currentTimeMillis()) .collect(Collectors.toList()); if (CollectionUtils.isEmpty(list)) throw new IllegalArgumentException("not found valid proxy"); return list; @Autowired KeywordsService kwService; @Autowired RedisTempService redisService; /* * 关键词,size = proxy.size * 10 */ @Override protected List<Keywords> getValidEntity(int size) List<Serializable> list= (List<Serializable>) redisService.rpop("KeywordsQueue",size); JSONArray jsonArray = JSONArray.fromObject(list); List arrayList = JSONArray.toList(jsonArray,Keywords.class); if (CollectionUtils.isEmpty(list)) throw new IllegalArgumentException("not found valid keywords"); return arrayList; @Override protected void updateEntity(Keywords entity, List<Weibo> weibos) kwService.updateAfterDownload(entity, weibos); private void queryFinally(HttpProxy proxy) if(proxy.getFailure()<=3 && proxy.getLiveTime()>(System.currentTimeMillis()/1000)) proxy.setStatus(1); StaticService.update(proxy); proxyService.saveOrUpdate(proxy); else proxyService.deleteByHostAndPort(proxy.getHost(),proxy.getPort()); StaticService.del(proxy); @Override public void download(HttpProxy proxy, Keywords entity) try long consumeTime = System.currentTimeMillis(); List<Weibo> weibos = query(proxy, entity); storeWeibos(weibos); if(entity!=null) if(!(entity.getRegion().equalsIgnoreCase("hot"))) updateEntity(entity, weibos); consumeTime = System.currentTimeMillis() - consumeTime; saveDownloadLog(entity, proxy, consumeTime, weibos); catch (Exception e) e.printStackTrace();
Storage.class(将xml文件打为jar包)
@Component public class Storage private static Logger logger = LoggerFactory.getLogger(Storage.class); private BloomFilter<String> filter; public Storage() int expectedInsertions = Integer.MAX_VALUE >> 4; filter = BloomFilter.create(Funnels.stringFunnel(Charsets.UTF_8), expectedInsertions); @Scheduled(initialDelay = 10_000,fixedRate = 540_000) public void run() logger.info("storage thread running."); try JSONArray jArray = readTempXml(); if (jArray == null || jArray.isEmpty()) return; writeToZip(jArray); catch (Exception e) logger.error(e.getMessage()); private void writeToZip(JSONArray jArray) // 保存的文件名 String filename = getFilename(jArray); try (ZipOutputStream output = new ZipOutputStream(new FileOutputStream(filename))) int total = jArray.size(), xmlsize = 100; for (int i = 0, len = total / xmlsize + (total % xmlsize == 0 ? 0 : 1); i < len; i++) int fromIndex = i * xmlsize, toIndex = i == len - 1 ? total : (i + 1) * xmlsize; JSONArray list = JSONArray.fromObject(jArray.subList(fromIndex, toIndex)); ZipEntry entry = new ZipEntry((i + 1) + ".xml"); output.putNextEntry(entry); XmlWriter writer = new XmlWriter(); writer.write(list, output); catch (Exception e) logger.error("write to zip: ", e.getMessage()); logger.info("\t", jArray.size(), filename); WeiboUtils.total+=jArray.size(); logger.info("下载总数:", WeiboUtils.total); private String getFilename(JSONArray jArray) File directory = new File( Constants.STORE_BASE + File.separator + DateFormatUtils.format(new Date(), "yyyyMMdd")); if (!directory.exists()) directory.mkdirs(); int index; Collection<File> c = FileUtils.listFiles(directory, new String[] "zip" , true); if (!c.isEmpty()) index = c.stream().mapToInt(file -> String filename = StringUtils.substringBefore(file.getName(), "_"); return NumberUtils.toInt(filename); ).max().getAsInt() + 1; else index = 1; return directory.getPath() + File.separator + index + "_" + jArray.size() + ".zip"; AtomicLong incr = new AtomicLong(100_000_000L); private JSONArray readTempXml() File directory = new File(Constants.STORE_TEMP); if (!directory.isDirectory()) logger.error(" is not a directory", directory.getPath()); return null; Collection<File> c = FileUtils.listFiles(directory, new String[] "xml" , true); if (c.isEmpty()) logger.info("XML file not found"); return null; JSONArray jArray = new JSONArray(); for (File file : c) try XmlReader reader = new XmlReader(); JSONArray subArray = reader.read(file.getAbsolutePath()); logger.info("read temp xml: " + file.getAbsolutePath()); for (int i = 0, len = subArray.size(); i < len; i++) JSONObject jObject = subArray.getJSONObject(i); try String ur = jObject.getString("ur"); String md5Hex = DigestUtils.md5DigestAsHex(ur.getBytes()); md5Hex += incr.incrementAndGet(); if (!filter.mightContain(md5Hex)) jArray.add(jObject); filter.put(md5Hex); catch (Exception e) catch (Exception e) logger.error("read xml: ", e.getMessage()); finally file.delete(); return jArray;
XmlReader.class
public class XmlReader public XmlReader() public JSONArray read(String filename) throws IOException, ParserConfigurationException, SAXException try (InputStream input = new FileInputStream(filename)) return read(input); public JSONArray read(InputStream input) throws ParserConfigurationException, SAXException, IOException Document document = buildDocument(input); // 节点列表 NodeList nodes = document.getElementsByTagName("article"); JSONArray jArray = new JSONArray(); for (int i = 0, len = nodes.getLength(); i < len; i++) // 子节点列表 NodeList cNodes = nodes.item(i).getChildNodes(); if (cNodes.getLength() == 0) continue; JSONObject jObject = new JSONObject(); for (int j = 0; j < cNodes.getLength(); j++) Node cNode = cNodes.item(j); if (StringUtils.isNotBlank(cNode.getTextContent())) // 子节点名称和值 jObject.put(cNode.getNodeName().toLowerCase(), cNode.getTextContent()); if (jObject.size() > 0) jArray.add(jObject); return jArray; private Document buildDocument(InputStream in) throws ParserConfigurationException, SAXException, IOException DocumentBuilderFactory factory = DocumentBuilderFactory.newInstance(); DocumentBuilder builder = factory.newDocumentBuilder(); return builder.parse(in);
XmlWriter.class
public class XmlWriter public void write(JSONArray jArray, OutputStream output) throws IOException String xmlContent; try xmlContent = toXmlstr(jArray); catch (TransformerException | ParserConfigurationException e) throw new IOException(e); IOUtils.write(xmlContent, output, "UTF-8"); private String toXmlstr(JSONArray jArray) throws IOException, TransformerException, ParserConfigurationException TransformerFactory factory = TransformerFactory.newInstance(); factory.setAttribute("indent-number", 4); // 设置缩进长度 Transformer transformer = factory.newTransformer(); transformer.setOutputProperty(OutputKeys.INDENT, "yes"); // 设置自动换行 StringWriter writer = new StringWriter(); Source source = new DOMSource(buildDocument(jArray)); transformer.transform(source, new StreamResult(writer)); return writer.toString(); private Document buildDocument(JSONArray jArray) throws ParserConfigurationException DocumentBuilderFactory factory = DocumentBuilderFactory.newInstance(); DocumentBuilder builder = factory.newDocumentBuilder(); Document document = builder.newDocument(); // parent Element root = document.createElement("articles"); document.appendChild(root); for (int i = 0, len = jArray.size(); i < len; i++) JSONObject jObject = jArray.getJSONObject(i); // children Element item = document.createElement("article"); root.appendChild(item); for (Object key : jObject.keySet()) String field = (String) key, value = jObject.getString(field); if (value == null || value.isEmpty()) continue; // attribute Element attr = document.createElement(field); attr.setTextContent(value); item.appendChild(attr); return document;
爬取程序的结构用自己的就行,主要说一下每日单程序爬取百万数的问题:
1.微博的反爬。
我用的方法有:1)使用动态代理服务器 买了一个ip池每天2500个ip,我用的快代理。
2)使用user-agent池,这个之前的博客有写过。
3)抓取的速度在1秒1次还是可以的。
2.关键词质量。
用的方法:抓取微博、百度、搜狗、360热搜榜。
3.程序稳定性、持久性。
使用多线程+spring框架+定时重启程序的方法。
本人也刚学习爬虫有许多不足,请多指教。
爬虫虽好,可不要贪杯啊。
原创文章,转发请私聊。
以上是关于Java微博爬虫-每日百万数据的主要内容,如果未能解决你的问题,请参考以下文章