1、从小白到大数据架构师的学习历程

https://mp.weixin.qq.com/s/pw1EqLXTzrc86x9odTXKkQ
<https://mp.weixin.qq.com/s/pw1EqLXTzrc86x9odTXKkQ>

大数据处理技术怎么学习呢?首先我们要学习Python语言和Linux操作系统,这两个是学习大数据的基础,学习的顺序不分前后。

Linux:因为大数据相关软件都是在Linux上运行的,所以Linux要学习的扎实一些,学好Linux
对你快速掌握大数据相关技术会有很大的帮助,能让你更好的理解hadoop、hive、hbase、spark等大数据软件的运行环境和网络环境配置,能少踩很多坑,学会
shell就能看懂脚本这样能更容易理解和配置大数据集群。还能让你对以后新出的大数据技术学习起来更快。

(1)Hadoop:Hadoop里面包括几个组件HDFS、MapReduce和YARN,HDFS是存储数据的地方就像我们电脑的硬盘一样文件都存储在这个上面,
MapReduce是对数据进行处理计算的,它有个特点就是不管多大的数据只要给它时间它就能把数据跑完,但是时间可能不是很快所以它叫数据的批处理。其实把Hadoop
的这些组件学明白你就能做大数据的处理了,只不过你现在还可能对”大数据”到底有多大还没有个太清楚的概念,听我的别纠结这个。等以后你工作了就会有很多场景遇到几十T/
几百T大规模的数据,到时候你就不会觉得数据大真好,越大越有你头疼的。当然别怕处理这么大规模的数据,因为这是你的价值所在,让那些个搞Javaee的php的
html5的和DBA的羡慕去吧。

(2)Zookeeper:这是个万金油,安装Hadoop的HA的时候就会用到它,以后的Hbase也会用到它
。它一般用来存放一些相互协作的信息,这些信息比较小一般不会超过1M,都是使用它的软件对它有依赖,对于我们个人来讲只需要把它安装正确,让它正常的run
起来就可以了。

(3)MySQL:我们学习完大数据的处理了,接下来学习学习小数据的处理工具mysql数据库,因为一会装hive的时候要用到,mysql需要掌握到什么层度那?
你能在Linux上把它安装好,运行起来,会配置简单的权限,修改root的密码,创建数据库。这里主要的是学习SQL的语法,因为hive的语法和这个非常相似。

(4)Sqoop:这个是用于把Mysql里的数据导入到Hadoop里的。当然你也可以不用这个,直接把Mysql数据表导出成文件再放到HDFS
上也是一样的,当然生产环境中使用要注意Mysql的压力。

(5)Hive:这个东西对于会SQL语法的来说就是神器,它能让你处理大数据变的很简单,不会再费劲的编写MapReduce程序。

(6)Oozie:既然学会Hive了,我相信你一定需要这个东西,它可以帮你管理你的Hive或者MapReduce、Spark脚本
,还能检查你的程序是否执行正确,出错了给你发报警并能帮你重试程序,最重要的是还能帮你配置任务的依赖关系。我相信你一定会喜欢上它的,不然你看着那一大堆脚本,和密密麻麻的
crond是不是有种想屎的感觉。

(7)Hbase:这是Hadoop生态体系中的NOSQL数据库,他的数据是按照key和value的形式存储的并且key是唯一的,所以它能用来做数据的排重,它与
MYSQL相比能存储的数据量大很多。所以他常被用于大数据处理完成之后的存储目的地。

(8)Kafka:这是个比较好用的队列工具,队列是干吗的?排队买票你知道不?
数据多了同样也需要排队处理,这样与你协作的其它同学不会叫起来,你干吗给我这么多的数据(比如好几百G的文件)
我怎么处理得过来,你别怪他因为他不是搞大数据的,你可以跟他讲我把数据放在队列里你使用的时候一个个拿,这样他就不在抱怨了马上灰流流的去优化他的程序去了,因为处理不过来就是他的事情。而不是你给的问题。当然我们也可以利用这个工具来做线上实时数据的入库或入
HDFS,这时你可以与一个叫Flume的工具配合使用,它是专门用来提供对数据进行简单处理,并写到各种数据接受方(比如Kafka)的。

(9)Spark:它是用来弥补基于MapReduce处理数据速度上的缺点,它的特点是把数据装载到内存中计算而不是去读慢的要死进化还特别慢的硬盘。特别适合做
迭代运算,所以算法流们特别稀饭它。它是用scala编写的。Java语言或者Scala都可以操作它,因为它们都是用JVM的。

(10)机器学习(Machine Learning, ML):是一门多领域交叉学科,涉及概率论、统计学、逼近论、凸分析、算法复杂度理论等多门学科。它是
人工智能的核心,是使计算机具有智能的根本途径,其应用遍及人工智能的各个领域,它主要使用归纳、综合而不是演绎。机器学习的算法基本比较固定了,学习起来相对容易。

(11)深度学习(Deep Learning, DL):深度学习的概念源于人工神经网络的研究,最近几年发展迅猛。深度学习应用的实例有AlphaGo
、人脸识别、图像检测等。是国内外稀缺人才,但是深度学习相对比较难,算法更新也比较快,需要跟随有经验的老师学习。

2、我是怎样爬下6万共享单车数据并进行分析的

https://mp.weixin.qq.com/s/H2C9u9OK_EB_4YREonDkHA

用Pyhon获取、分析单车数据的过程,并为你分析得出的结论

最直接的来源是摩拜单车的APP。现代的软件设计都讲究前后端分离,而且服务端会同时服务于APP、网页等。在这种趋势下我们只需要搞清楚软件的HTTP
请求就好了。一般而言有以下一些工具可以帮忙:

直接抓包:

    Wireshark (在路由器或者电脑)

    Shark for Root (Android)

用代理进行HTTP请求抓包及调试:

    Fiddler 4

    Charles

    Packet Capture (Android)

所以只能首先采用Fiddler或者Charles的方式试试。

随着微信小程序的火爆,摩拜单车也在第一时间出了小程序。我一看就笑了,不错,又给我来了一个数据源,试试。

用Packet Capture抓了一次数据后很容易确定API。抓取后爬取了两三天的数据,发现出现了转机,数据符合正常的单车的轨迹。

摩拜单车的API之所以很容易抓取和分析,很大程度上来讲是由于API设计的太简陋:

① 仅使用http请求,使得很容易进行抓包分析

② 在这些API中都没有对request进行一些加密,使得自己的服务很容易被人利用。

③ 另外微信小程序也是泄露API的一个重要来源,毕竟在APP中request请求可以通过native代码进行加密然后在发出,但在小程序中似乎还没有这样的功能。

目录结构

    \analysis - jupyter做数据分析

    \influx-importer - 导入到influxdb,但之前没怎么弄好

    \modules - 代理模块

    \web - 实时图形化显示模块,当时只是为了学一下react而已,效果请见这里

    crawler.py - 爬虫核心代码

    importToDb.py - 导入到postgres数据库中进行分析

    sql.sql - 创建表的sql

    start.sh - 持续运行的脚本

1、思路

核心代码放在crawler.py中,数据首先存储在sqlite3数据库中,然后去重复后导出到csv文件中以节约空间。

摩拜单车的API返回的是一个正方形区域中的单车,我只要按照一块一块的区域移动就能抓取到整个大区域的数据。

left,top,right,bottom定义了抓取的范围,目前是成都市绕城高速之内以及南至南湖的正方形区域。offset定义了抓取的间隔,现在以0.002
为基准,在DigitalOcean 5$的服务器上能够15分钟内抓取一次。

def start(self):

        left = 30.7828453209

        top = 103.9213455517

        right = 30.4781772402

        bottom = 104.2178123382

        offset = 0.002

        if os.path.isfile(self.db_name):

            os.remove(self.db_name)

        try:

            with sqlite3.connect(self.db_name) as c:

                c.execute('''CREATE TABLE mobike

                    (Time DATETIME, bikeIds VARCHAR(12), bikeType
TINYINT,distId INTEGER,distNum TINYINT, type TINYINT, x DOUBLE, y DOUBLE)''')

        except Exception as ex:

            pass

然后就启动了250个线程,至于你要问我为什么没有用协程,哼哼~~我当时没学~~~其实是可以的,说不定效率更高。

由于抓取后需要对数据进行去重,以便消除小正方形区域之间重复的部分,最后的group_data正是做这个事情。

        executor = ThreadPoolExecutor(max_workers=250)

        print("Start")

        self.total = 0

        lat_range = np.arange(left, right, -offset)

        for lat in lat_range:

            lon_range = np.arange(top, bottom, offset)

            for lon in lon_range:

                self.total += 1

                executor.submit(self.get_nearby_bikes, (lat, lon))

        executor.shutdown()

        self.group_data()

最核心的API代码在这里。小程序的API接口,搞几个变量就可以了,十分简单。

    def get_nearby_bikes(self, args):

        try:

            url = "https://mwx.mobike.com/mobike-api/rent/nearbyBikesInfo.do"

 

  payload = "latitude=%s&longitude=%s&errMsg=getMapCenterLocation" % (args[0],
args[1])

            headers = {

                'charset': "utf-8",

                'platform': "4",

                "referer":"https://servicewechat.com/wx40f112341ae33edb/1/",

                'content-type': "application/x-www-form-urlencoded",

                'user-agent': "MicroMessenger/6.5.4.1000 NetType/WIFI
Language/zh_CN",

                'host': "mwx.mobike.com",

                'connection': "Keep-Alive",

                'accept-encoding': "gzip",

                'cache-control': "no-cache"

            }

            self.request(headers, payload, args, url)

        except Exception as ex:

            print(ex)

最后你可能要问频繁的抓取IP没有被封么?其实摩拜单车是有IP的访问速度限制的,只不过破解之道非常简单,就是用大量的代理。

我是有一个代理池,每天基本上有8000以上的代理。在ProxyProvider中直接获取到这个代理池然后提供一个pick函数用于随机选取得分前50的代理。

请注意,我的代理池是每小时更新的,但是代码中提供的jsonblob的代理列表仅仅是一个样例,过段时间后应该大部分都作废了。

在这里用到一个代理得分的机制。我并不是直接随机选择代理,而是将代理按照得分高低进行排序。每一次成功的请求将加分,而出错的请求将减分。

这样一会儿就能选出速度、质量最佳的代理。如果有需要还可以存下来下次继续用。

class ProxyProvider:

    def __init__(self, min_proxies=200):

        self._bad_proxies = {}

        self._minProxies = min_proxies

        self.lock = threading.RLock()

        self.get_list()

    def get_list(self):

        logger.debug("Getting proxy list")

        r =
requests.get("https://jsonblob.com/31bf2dc8-00e6-11e7-a0ba-e39b7fdbe78b",
timeout=10)

        proxies = ujson.decode(r.text)

        logger.debug("Got %s proxies", len(proxies))

        self._proxies = list(map(lambda p: Proxy(p), proxies))

    def pick(self):

        with self.lock:

            self._proxies.sort(key = lambda p: p.score, reverse=True)

            proxy_len = len(self._proxies)

            max_range = 50 if proxy_len > 50 else proxy_len

            proxy = self._proxies[random.randrange(1, max_range)]

            proxy.used()

            return proxy

在实际使用中,通过proxyProvider.pick()选择代理,然后使用。如果代理出现任何问题,则直接用proxy.fatal_error()
降低评分,这样后续就不会选择到这个代理了。

    def request(self, headers, payload, args, url):

        while True:

            proxy = self.proxyProvider.pick()

            try:

                response = requests.request(

                    "POST", url, data=payload, headers=headers,

                    proxies={"https": proxy.url},

                    timeout=5,verify=False

                )

                with self.lock:

                    with sqlite3.connect(self.db_name) as c:

                        try:

                            print(response.text)

                            decoded = ujson.decode(response.text)['object']

                            self.done += 1

                            for x in decoded:

    c.execute("INSERT INTO mobike VALUES (%d,'%s',%d,%d,%s,%s,%f,%f)" % (

                                    int(time.time()) * 1000, x['bikeIds'],
int(x['biketype']), int(x['distId']),

                                    x['distNum'], x['type'], x['distX'],

                                    x['distY']))

 

                            timespend = datetime.datetime.now() -
self.start_time

                            percent = self.done / self.total

                            total = timespend / percent

                            print(args, self.done, percent * 100, self.done /
timespend.total_seconds() * 60, total,

                                  total - timespend)

                        except Exception as ex:

                            print(ex)

                    break

            except Exception as ex:

                proxy.fatal_error()

抓取了摩拜单车的数据并进行了大数据分析。以下数据分析自1月19日整日的数据,范围成都绕城区域以及至华阳附近(天府新区)内。成都的摩拜单车的整体情况如下:

标准、Lite车型数量相当

摩拜单车在成都大约已经有6万多辆车,两种类型的车分别占有率为55%和44%,可见更为好骑的Lite版本的占有率在提高。(1为标准车,2为Lite车型)

 

 

三成左右的车没有移动过

数据分析显示,有三成的单车并没有任何移动,这说明这些单车有可能被放在不可获取或者偏僻地方。市民的素质还有待提高啊。

出行距离以3公里以下为主

数据分析显示3公里以下的出行距离占据了87.2%,这也十分符合共享单车的定位。100米以下的距离也占据了大量的数据,但认为100米以下的数据为GPS
的波动,所以予以排除。

 

出行距离分布

骑行次数以5次以下居多

单车的使用频率越高共享的效果越好。从摩拜单车的数据看,在流动的单车中,5次以下占据了60%左右的出行。但1次、2次的也占据了30%
左右的份额,说明摩拜单车的利用率也不是很高。

从单车看城市发展

从摩拜单车的热图分布来看,成都已经逐步呈现“双核”发展的态势,城市的新中心天府新区正在聚集更多的人和机会。

 

双核发展

原来的老城区占有大量的单车,在老城区,热图显示在东城区占有更多的单车,可能和这里的商业(春熙路、太古里、万达)及人口密集的小区有直接的联系。

 

老城区

而在成都的南部天府新区越来越多也茁壮的发展起来,商业区域和住宅区域区分明显。在晚上,大量的单车聚集在华阳、世纪城、中和,而在上班时间,则大量聚集在软件园附近。