`
357029540
  • 浏览: 725271 次
  • 性别: Icon_minigender_1
  • 来自: 重庆
社区版块
存档分类
最新评论

使用NiFi简单的从网站获取数据写入到mysql数据库

    博客分类:
  • nifi
阅读更多

        最近稍微研究了下etl工具nifi,它是Apache下面的一个项目,是用作数据流处理的,具体的就不多做介绍了,网上也有很多介绍,官网是https://nifi.apache.org/docs.html,我在这里做了一个从中移物联网onenet平台抽取数据到mysql的功能,没有做数据过滤以及中文转码乱码的情况处理,这2个地方还不清楚怎么做,如果会的同学也帮忙指导我一下,我们接下来看看整个的设计过程。

      首先我们在官网上面下载nifi,我这里使用的版本是1.9.2,解压nifi后,我们就可以进入该文件夹下面的bin目录,点击run-nifi.bat启动nifi,可能nifi启动毕竟慢的原因,我关闭后重新启动一两次才能打开nifi的网页,默认端口是8080,可以在conf文件夹下面的nifi.properties修改端口号nifi.web.http.port。

      进入到nifi主页后,我们可以看到如下图片



 我们选择第一个图标,按住鼠标拖动该图标到空白页即可,显示出如下图片

 在搜索框里面输入GetHttp,然后双击它就可以了,然后在空白页就会出现

 双击或者右键点击configure选项,我们可以在settings->name下自定义名称,我们这里主要介绍下properties页签下的配置,其他的页签配置可以参考官网介绍,在properties页签下我们首先得定义从哪里取数据即url属性,在配置url属性之前我们可以可以在nifi配置一个公共的url地址属性,在空白处右键点击鼠标可以出现如下所示的内容



 选择variables属性,进入后如下所示



 添加如上所示的属性,现在回到GetHttp的properties页签上,在url属性上输入以下值



 filename属性就是以后的flowfile名称,同时定义一个自定义属性api-key,用于参数的传递,接下来需要注意的是必须定义一个SSL Context Service,点击该属性的value值,如果没有定义过该属性值,需要我们自己新创建一个SSL Context Service,如下所示

 点击create new service后进入如下界面



 
 根据情况选择其中一个即可,通过SSL Context Service最右边的箭头符号进入到创建好的value值后,我们可以在properties页签中定义keystore和truststore2种方式的加密方式,我这里选择了keystore的方式,可以参考https://www.cnblogs.com/aiaitie/p/9525564.html这个文章或者百度下keystore查询,生成后的引用方式

 设置正确后,可以看到有个“雷电”符号,点击它表示启用就可以完成该SSL Context Service的应用了,因为从onenet接收过来的数据是

{
  "errno" : 0,
  "data" : {
    "private" : true,
    "protocol" : "DTU",
    "create_time" : "2019-07-16 10:25:48",
    "act_time" : "2019-07-16 17:02:09",
    "online" : true,
    "id" : "设备id",
    "auth_info" : "设备编号",
    "last_ct" : "2019-07-30 08:27:27",
    "datastreams" : [ {
      "create_time" : "2019-07-16 17:02:11",
      "uuid" : "ae14d1f5-b789-4ea7-a894-a13b447283bd",
      "id" : "xxx"
    }, ],
    "title" : "设备名称",
    "desc" : "power monitor device"
  },
  "error" : "succ"
}

 我们需要将数据进行json平展,所以引入了FlattenJson Processor进行json数据平展,如下所示



 默认情况下分隔符是“.”,平展后的数据是

{
  "errno" : 0,
  "data.private" : true,
  "data.protocol" : "DTU",
  "data.create_time" : "2019-07-16 10:25:48",
  "data.act_time" : "2019-07-16 17:02:09",
  "data.online" : true,
  "data.id" : "设备id",
  "data.auth_info" : "设备编号",
  "data.last_ct" : "2019-07-30 08:27:27",
  "data.datastreams" : [ {
    "create_time" : "2019-07-16 17:02:11",
    "uuid" : "ae14d1f5-b789-4ea7-a894-a13b447283bd",
    "id" : "xxx"
  }],
  "data.title" : "鍙板彉2",
  "data.desc" : "power monitor device",
  "error" : "succ"
}

 因为在mysql是不能使用“.”来数据插入操作的,而nifi是根据这些字段值进行数据对比和传值的,因此还需要把“.”替换为“_”来进行操作,因此我们还需要引入replaceText processor来进行替换操作,如下所示



 替换完成后接下来我们就可以将json转换为sql进行插入操作了,这个时候我们使用ConvertJSONToSQL processor进行数据的转换,在properties页签中我们需要定义JDBC Connection pool,点击value值,如果没有则新增一个,进入到JDBC Connection pool中,在DBCPConnectionPool的properties页签中我们定义需要的的相关属性,如下所示



 这里需要注意了,因为nifi没有导入mysql的相关jar包,需要我们自己去引入,把我们需要的mysql jar包放入到lib文件夹下面即可,就可以正常的进行使用了,配置好以后同样通过“雷电”符号进行启用配置,当然如果只有
ConvertJSONToSQL processor还是不能把数据写入到mysql数据库中的,我们还要引用putsql processor才行,配置如下



 整体上我们的nifi配置如下所示



 数据库结果如下:



 这里还有个奇怪的是做update操作的时候,它会将data_id字段修改为DATAID的sql语句,如下所示

UPDATE device SET data_title = ?, data_desc = ? WHERE DATAID = ?

 也不是很清楚这个出现的原因,会的同学麻烦指导下。

  • 大小: 7.9 KB
  • 大小: 104 KB
  • 大小: 6.7 KB
  • 大小: 11.6 KB
  • 大小: 15.5 KB
  • 大小: 4.8 KB
  • 大小: 9 KB
  • 大小: 38 KB
  • 大小: 34.2 KB
  • 大小: 45.8 KB
  • 大小: 30 KB
  • 大小: 13.1 KB
  • 大小: 22.1 KB
  • 大小: 74 KB
  • 大小: 13.6 KB
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics