大数据岗位需求情况分析(一)数据爬取和分析

file
百度搜索官网

使用八爪鱼网络数据采集器爬取数据

使用手机号注册账号

利用现有模板创建采集任务(免费用户不能设置定时采集以及云采集)
这里选择的是猎聘招聘网
file
设置查询关键词,这里使用”大数据“作为关键词
关键词可以写多行,并设置翻页次数
file
启动本地采集任务
file

如果IP被禁,过一段时间再次开启,多次执行就会累计足够数据
查看本地采集的数据
file

在远程数据库创建数据库,选取感兴趣的列建表
file

使用八爪鱼自带功能导出到远程MySQL数据库 (如果需要特殊处理可以导出CSV,处理后再导入数据库)
file

连接配置
file

找到唯一且不为空不重复的列作为主键,创建正确的映射关系是导出成功的前提
file

每批导出可以适当减少,因为同一批次中的数据如果有一个导出失败,整个批次也会失败。
file

如果还是不能导入,检查数据库表的编码是否支持中文,如果是latin则改成UTF8

导出csv格式再导入MySQL

发现八爪鱼采集的数据中包含回车空格的多余的字符,最好提前处理号再导出,否则导入到Hive时数据会混乱。

也可以在导入hive时使用参数--hive-drop-import-delims去掉两边多余的空格和回车

另外薪水字段的格式不方便后续分析,可以额外添加三个字段,分别放上解析出来的结果。

工作地点有的包含区名,不方便后续分析,可以额外添加一个city字段。

以上对原始数据的处理本是属于数据清洗的过程,一般是通过Hive完成的。这里由于数据采集是通过八爪鱼获得的,里面没有任何业务数据,所以为了方便直接在MySQL表上面添加了几个字段,方便后续的分析。

创建maven工程如下:

<project xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://maven.apache.org/POM/4.0.0">
        <modelVersion>4.0.0</modelVersion>
    <groupId>org.example</groupId>
    <artifactId>java_csv_to_mysql</artifactId>
    <version>1.0-SNAPSHOT</version>
    <name>java_csv_to_mysql Tapestry 5 Application</name>
    <properties>
        <java.version>1.8</java.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>net.sourceforge.javacsv</groupId>
            <artifactId>javacsv</artifactId>
            <version>2.0</version>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>druid</artifactId>
            <version>1.0.26</version>
        </dependency>
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>5.1.46</version>
        </dependency>
    </dependencies>
</project>

由于需要导入到MySQL,创建工具类

package com.niit.csv;

import java.sql.*;
import java.util.ResourceBundle;

public class DbUtils {

    private static  Connection connection = null;
    private static  PreparedStatement pps = null;
    private static  ResultSet rs = null;
    private static final String url;
    private static final String username;
    private static final String password;
    private static final String driver;
    public static final int BATCH_SIZE;

    static {
        ResourceBundle bundle = ResourceBundle.getBundle("db");
        url = bundle.getString("url");
        username = bundle.getString("username");
        password = bundle.getString("password");
        driver = bundle.getString("driver");
        BATCH_SIZE = Integer.parseInt(bundle.getString("batchSIze"));

        try {
            Class.forName(driver);
        } catch (ClassNotFoundException e) {
            e.printStackTrace();
        }
    }

    public static Connection getConnection() {
        try {
            connection = DriverManager.getConnection(url, username, password);
        } catch (SQLException e) {
            e.printStackTrace();
        }
        return connection;
    }

    protected static void closeAll() {
        try {
            if (rs != null) {
                rs.close();
            }
            if (pps != null) {
                pps.close();
            }
            if (connection != null) {
                connection.close();
            }
        } catch (SQLException e) {
            e.printStackTrace();
        }
    }
}

创建资源目录(marked as recources folder),并创建db.properties文件, 这里面主要是对MySQL数据源进行配置。

driver=com.mysql.jdbc.Driver
url=jdbc:mysql://192.168.186.100:3306/job?useSSL=false
username=root
password=niit1234
batchSIze=20

接下来编写实体类JobDetail

package com.niit.csv;

import java.util.Date;

/**
 * @Author: deLucia
 * @Date: 2021/4/18
 * @Version: 1.0
 * @Description:
 * 关键词,岗位名称,链接,薪酬,地区,学历,工作经验,发布时间,反馈时间,公司名称,公司链接,融资情况,标签,语言要求,年龄要求,职位描述,公司规模,公司地址,其他信息,公司介绍,企业介绍,当前时间
 */
public class JobDetail {
    /**
     *   链接
     */
    private String jobUrl;
    /**
     *   岗位名称
     */
    private String jobName;
    /**
     * 薪酬
     */
    private String jobSalary;
    /**
     * 地区
     */
    private String jobRegion;
    /**
     * 学历
     */
    private String seekerEdu;
    /**
     * 工作经验
     */
    private String seekerExp;
    /**
     * 发布时间
     */
    private String gmtPublish;
    /**
     * 反馈时间
     */
    private String gmtFeedback;
    /**
     * 公司名称
     */
    private String companyName;
    /**
     * 公司链接
     */
    private String companyUrl;
    /**
     * 融资情况
     */
    private String companyFinancing;
    /**
     * 标签
     */
    private String jobTag;
    /**
     * 当前时间
     */
    private Date gmtCreate;

    /**
     * 最低月薪
     */
    private int minSalaryPerMonth;
    /**
     * 最高月薪
     */
    private int maxSalaryPerMonth;

    /**
     * 一年几薪
     */
    private int monthsSalaryPerYear;

    /**
     * 市
     */
    private String city;

    public String getJobUrl() {
        return jobUrl;
    }

    public void setJobUrl(String jobUrl) {
        this.jobUrl = jobUrl;
    }

    public String getJobName() {
        return jobName;
    }

    public void setJobName(String jobName) {
        this.jobName = jobName;
    }

    public String getJobSalary() {
        return jobSalary;
    }

    public void setJobSalary(String jobSalary) {
        this.jobSalary = jobSalary;
    }

    public String getJobRegion() {
        return jobRegion;
    }

    public void setJobRegion(String jobRegion) {
        this.jobRegion = jobRegion;
    }

    public String getSeekerEdu() {
        return seekerEdu;
    }

    public void setSeekerEdu(String seekerEdu) {
        this.seekerEdu = seekerEdu;
    }

    public String getSeekerExp() {
        return seekerExp;
    }

    public void setSeekerExp(String seekerExp) {
        this.seekerExp = seekerExp;
    }

    public String getGmtPublish() {
        return gmtPublish;
    }

    public void setGmtPublish(String gmtPublish) {
        this.gmtPublish = gmtPublish;
    }

    public String getGmtFeedback() {
        return gmtFeedback;
    }

    public void setGmtFeedback(String gmtFeedback) {
        this.gmtFeedback = gmtFeedback;
    }

    public String getCompanyName() {
        return companyName;
    }

    public void setCompanyName(String companyName) {
        this.companyName = companyName;
    }

    public String getCompanyUrl() {
        return companyUrl;
    }

    public void setCompanyUrl(String companyUrl) {
        this.companyUrl = companyUrl;
    }

    public String getCompanyFinancing() {
        return companyFinancing;
    }

    public void setCompanyFinancing(String companyFinancing) {
        this.companyFinancing = companyFinancing;
    }

    public String getJobTag() {
        return jobTag;
    }

    public void setJobTag(String jobTag) {
        this.jobTag = jobTag;
    }

    public Date getGmtCreate() {
        return gmtCreate;
    }

    public void setGmtCreate(Date gmtCreate) {
        this.gmtCreate = gmtCreate;
    }

    public int getMinSalaryPerMonth() {
        return minSalaryPerMonth;
    }

    public void setMinSalaryPerMonth(int minSalaryPerMonth) {
        this.minSalaryPerMonth = minSalaryPerMonth;
    }

    public int getMaxSalaryPerMonth() {
        return maxSalaryPerMonth;
    }

    public void setMaxSalaryPerMonth(int maxSalaryPerMonth) {
        this.maxSalaryPerMonth = maxSalaryPerMonth;
    }

    public int getMonthsSalaryPerYear() {
        return monthsSalaryPerYear;
    }

    public void setMonthsSalaryPerYear(int monthsSalaryPerYear) {
        this.monthsSalaryPerYear = monthsSalaryPerYear;
    }

    public String getCity() {
        return city;
    }

    public void setCity(String city) {
        this.city = city;
    }

    public JobDetail() {
    }

    @Override
    public String toString() {
        return "JobDetail{" +
                "jobUrl='" + jobUrl + '\'' +
                ", jobName='" + jobName + '\'' +
                ", jobSalary='" + jobSalary + '\'' +
                ", jobRegion='" + jobRegion + '\'' +
                ", seekerEdu='" + seekerEdu + '\'' +
                ", seekerExp='" + seekerExp + '\'' +
                ", gmtPublish='" + gmtPublish + '\'' +
                ", gmtFeedback='" + gmtFeedback + '\'' +
                ", companyName='" + companyName + '\'' +
                ", companyUrl='" + companyUrl + '\'' +
                ", companyFinancing='" + companyFinancing + '\'' +
                ", jobTag='" + jobTag + '\'' +
                ", gmtCreate=" + gmtCreate +
                ", minSalaryPerMonth=" + minSalaryPerMonth +
                ", maxSalaryPerMonth=" + maxSalaryPerMonth +
                ", monthsSalaryPerYear=" + monthsSalaryPerYear +
                ", city='" + city + '\'' +
                '}';
    }
}

最后编写主程序如下:

package com.niit.csv;

import com.csvreader.CsvReader;
import com.sun.xml.internal.fastinfoset.stax.events.Util;

import java.nio.charset.Charset;
import java.sql.Connection;
import java.sql.Statement;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.List;

/**
 * @Author: deLucia
 * @Date: 2021/4/13
 * @Version: 1.0
 * @Description: 处理CSV后导入到MySQL
 */
public class ProcessCSV2MySQL {

    private static final SimpleDateFormat SDF = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");

    public static void main(String[] args) {

        //生成CsvReader对象,以,为分隔符,GBK编码方式
        CsvReader r;
        List<JobDetail> list = new ArrayList<JobDetail>();
        try {
            String csvFilePath = "./data/data.csv";
            r = new CsvReader(csvFilePath, ',', Charset.forName("UTF-8"));
            //读取表头
            r.readHeaders();
            //逐条读取记录,直至读完
            while (r.readRecord()) {
                JobDetail d = new JobDetail();
                d.setCompanyFinancing(r.get("融资情况").trim());
                d.setCompanyName(r.get("公司名称").trim());
                d.setCompanyUrl(r.get("公司链接").trim());

                String dateStr = r.get("当前时间").trim();
                if (!Util.isEmptyString(dateStr)) {
                    d.setGmtCreate(SDF.parse(dateStr));
                }

                d.setGmtFeedback(r.get("反馈事件").trim());
                d.setGmtPublish(r.get("发布时间").trim());
                d.setJobName(r.get("岗位名称").trim());
                String regionString = r.get("地区").trim();
                d.setJobRegion(regionString);
                String salaryStr = r.get("薪酬").trim();
                d.setJobSalary(salaryStr);
                d.setJobTag(r.get("标签").trim());
                d.setJobUrl(r.get("链接").trim());
                d.setSeekerEdu(r.get("学历").trim());
                d.setSeekerExp(r.get("经验").trim());

                //   20-60k·14薪
                if (!Util.isEmptyString(salaryStr) && !"面议".equals(salaryStr)) {
                    String[] split = salaryStr.split("·");
                    String[] s = split[0].split("-");
                    // MIN_SALARY_PER_MONTH
                    d.setMinSalaryPerMonth(Integer.parseInt(s[0]));
                    // MAX_SALARY_PER_MONTH
                    d.setMaxSalaryPerMonth(Integer.parseInt(s[1].replace("k", "")));
                    // MONTHS_SALARY_PER_YEAR
                    d.setMonthsSalaryPerYear(Integer.parseInt(split[1].replace("薪","")));
                }

                // city
                if (!Util.isEmptyString(regionString)) {
                    d.setCity(regionString.split("-")[0]);
                }

                list.add(d);
            }
            r.close();

            int totalSize = list.size();
            int index = 0;
            int batchCount = 0;
            Connection connection = DbUtils.getConnection();
            Statement statement = connection.createStatement();

            while (index <= totalSize - 1) {
                StringBuilder sb = new StringBuilder();
                for (int i = index; i < index + DbUtils.BATCH_SIZE; i++) {
                    if (i > totalSize - 1) {
                        break;
                    }
                    JobDetail d = list.get(i);
                    if (i != index) {
                        sb.append(",");
                    }
                    sb.append("(\"");
                    sb.append(d.getJobUrl());
                    sb.append("\",\"");
                    sb.append(d.getJobName());
                    sb.append("\",\"");
                    sb.append(d.getJobSalary());
                    sb.append("\",\"");
                    sb.append(d.getJobRegion());
                    sb.append("\",\"");
                    sb.append(d.getSeekerEdu());
                    sb.append("\",\"");
                    sb.append(d.getSeekerExp());
                    sb.append("\",\"");
                    sb.append(d.getGmtPublish());
                    sb.append("\",\"");
                    sb.append(d.getGmtFeedback());
                    sb.append("\",\"");
                    sb.append(d.getCompanyName());
                    sb.append("\",\"");
                    sb.append(d.getCompanyUrl());
                    sb.append("\",\"");
                    sb.append(d.getCompanyFinancing());
                    sb.append("\",\"");
                    sb.append(d.getJobTag());
                    sb.append("\",\"");
                    sb.append(d.getGmtCreate());
                    sb.append("\",\"");
                    sb.append(d.getMinSalaryPerMonth());
                    sb.append("\",\"");
                    sb.append(d.getMaxSalaryPerMonth());
                    sb.append("\",\"");
                    sb.append(d.getMonthsSalaryPerYear());
                    sb.append("\",\"");
                    sb.append(d.getCity());
                    sb.append("\")");

                }

                index += DbUtils.BATCH_SIZE;

                System.out.println(sb.toString());
                int updateResult = statement.executeUpdate("INSERT INTO `bigdata_job` VALUES " + sb.toString());

                if (updateResult > 0) {
                    System.out.println("Batch[" + (batchCount++) + "]: " + updateResult + " records updated successfully.");
                } else {
                    System.out.println("Batch[" + (batchCount++) + "]: " + updateResult + " records failed to update.");
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            DbUtils.closeAll();
        }
    }
}

这里设置了每10条数据一个批次,批次的大小可以通过BATCH_SIZE来修改。

数据导入

下面通过sqoop将mysql的数据导入到hive表当中,以便后续进行统计分析的工作。

在hive当中创建表

Hive是一个数据仓库基础工具提供简单的sql查询功能,可以将sql语句转换为MapReduce任务进行运行。Hive 并不能够在大规模数据集上实现低延迟快速的查询,Hive 的最佳使用场合是大数据集的批处理作业,例如,网络日志分析。

进入到hive的客户端,然后执行以下命令来创建hive数据库以及hive数据库表

create table job_detail(
job_url string,
job_name string,
job_salary string,
job_region string,
seeker_edu string,
seeker_exp string,
gmt_publish string,
gmt_feedback string,
company_name string,
company_url string,
company_financing string,
job_tag string,
gmt_create string,
min_salary_per_month int,
max_salary_per_month int,
months_salary_per_year int,
city string
) partitioned by (data_date string) row format delimited fields terminated by '\001' location 'hdfs://hadoop100:8020/user/hive/warehouse/job.db/job_detail';

使用sqoop将mysql数据导入到hive当中

数据进入mysql数据库之后,可以使用sqoop工具来讲数据导入到hive数仓当中,进行统一分析

sqoop import  \
    --connect jdbc:mysql://hadoop100:3306/job?useSSL=false  \
    --username root \
    --password niit1234 \
    --table bigdata_job \
    --delete-target-dir  \
    --hive-import  \
    --hive-database job  \
    --hive-table job_detail  \
    --hive-partition-key data_date  \
    --hive-partition-value 2021-02-28 \
    --hive-drop-import-delims \
    -m 1

使用Hive对大数据岗位数据进行多维分析

接下来使用hive进行统计分析工作,并将分析结果分别存在另一张hive表中。

1. 统计此次爬取的数据中一共有多少个大数据岗位

创建表

> create table job_count(process_date string,total_job_count int) partitioned by (data_date string) row format delimited fields terminated by '\001';

查询并插入数据

> insert overwrite table job_count partition(data_date = '2021-02-28') select '2021-02-28' as process_date,count(distinct job_url) as total_job_count from job_detail;

2、统计每个城市提供的大数据岗位数量

创建表

> create table job_city_count(process_date string,city string, total_job_count int) partitioned by (data_date string);

查询并插入数据

> insert overwrite table job_city_count partition(data_date = '2021-02-28') select '2021-02-28' as process_date, city, count(distinct job_url) as total_job_count from job_detail group by city;

3、按照薪资降序排序各城市大数据相关岗位(每个城市按薪资倒序取前10)

创建表

> create table job_city_salary(process_date string,city string, job_name string, salary_per_month int) partitioned by (data_date string);

查询并插入数据

insert overwrite table job_city_salary partition(data_date = '2021-02-28')
select
  "2021-02-28" as process_date,
  city,
  job_name,
  salary_per_month
from (
  select
    city,
    job_name,
    salary_per_month,
    row_number() over (partition by city order by salary_per_month desc) as rank
  from (
    select
      city,
      job_name,
      avg((min_salary_per_month + max_salary_per_month) * months_salary_per_year / 24) as salary_per_month
    from job_detail
    group by city,job_name having city!="null"
  ) tmp1
) tmp2 where rank<=10;

说明:

  • row_number() over (partition by ... order by)用来实现topN查询
    • 分组->排序->打行号标记,再套一层子查询根据行号打印每个分组的前10条记录

4、统计所有大数据岗位中出现次数最多的标签(取倒序前50个标签)

创建表

create table job_tag(process_date string,job_tag string, tag_count int) partitioned by (data_date string);

查询并插入数据

insert overwrite table job_tag partition(data_date='2021-02-28')
select
  '2021-02-28' as process_date,
  tmptable.single_tag as job_tag,
  count(single_tag) as tag_count
from (
  select subview.single_tag from job_detail lateral view explode(split(job_tag," ")
) subview as single_tag) tmptable group by single_tag order by tag_count desc limit 50;

说明:

  • explode函数可以将单行数据(数组)转换成多列数据输出(行转列)
  • 使用UDTF函数时(如split函数)只能select拆分出来的字段
  • 如果想要同时select其他字段,需配合侧视图(lateral view)使用
  • single_tag是给拆分出来的字段起的别名

Views: 107

Index