欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

java kafka寫入數(shù)據(jù)到HDFS問題

 更新時間:2023年08月31日 08:56:35   作者:我是女孩  
這篇文章主要介紹了java kafka寫入數(shù)據(jù)到HDFS問題,具有很好的參考價值,希望對大家有所幫助,如有錯誤或未考慮完全的地方,望不吝賜教

java kafka寫入數(shù)據(jù)到HDFS

安裝kafka,見我以前的文章

http://www.dbjr.com.cn/server/2968144y7.htm

向Hdfs寫入文件,控制臺會輸出以下錯誤信息:

Caused by: org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.security.AccessControlException): Permission denied: user=s00356746, access=WRITE, inode="/user":root:supergroup:drwxr-xr-x

從中很容易看出是因為當前執(zhí)行Spark Application的用戶沒有Hdfs“/user”目錄的寫入權(quán)限。這個問題無論是在Windows下還是Linux下提交Spark Application都經(jīng)常會遇到

如果是歐拉操作系統(tǒng)

需做如下處理

chattr -i etc/passwd
chattr -i /etc/shadow
chattr -i /etc/group
chattr -i /etc/passwd-
chattr -i /etc/shadow-
chattr -i /etc/group-
lsattr passwd*
都需要沒有   i   屬性

如果是Linux環(huán)境

將執(zhí)行操作的用戶添加到supergroup用戶組。

groupadd supergroup
usermod -a -G supergroup s00356746

如果是Windows用戶

在hdfs namenode所在機器添加新用戶,用戶名為執(zhí)行操作的Windows用戶名,然后將此用戶添加到supergroup用戶組。

adduser s00356746
groupadd supergroup
usermod -a -G supergroup s00356746

這樣,以后每次執(zhí)行類似操作可以將文件寫入Hdfs中屬于s00356746用戶的目錄內(nèi),而不會出現(xiàn)上面的Exception。

生產(chǎn)者代碼

import java.util.Properties;
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
public class KafkaProducer  {
    private final Producer<String, String> producer;
    public final static String TOPIC = "test";
    private KafkaProducer(){
        Properties props = new Properties();
        //此處配置的是kafka的端口
        props.put("metadata.broker.list", "10.175.118.105:9092");
        //配置value的序列化類
        props.put("serializer.class", "kafka.serializer.StringEncoder");
        //配置key的序列化類
        props.put("key.serializer.class", "kafka.serializer.StringEncoder");
        props.put("request.required.acks","-1");
        producer = new Producer<String, String>(new ProducerConfig(props));
    }
    void produce() {
        int messageNo = 1000;
        final int COUNT = 10000;
        while (messageNo < COUNT) {
            String key = String.valueOf(messageNo);
            String data = "hello kafka message " + key;
            producer.send(new KeyedMessage<String, String>(TOPIC, key ,data));
            System.out.println(data);
            messageNo ++;
        }
    }
    public static void main( String[] args )
    {
        new KafkaProducer().produce();
    }
}

kafka寫入Hdfs

package com.huawei.hwclouds.dbs.ops.huatuo.diagnosis.service.impl;
import com.huawei.hwclouds.dbs.common.exception.DBSErrorCode;
import com.huawei.hwclouds.dbs.common.exception.DBSException;
import com.huawei.hwclouds.dbs.constants.VolumeIoType;
import com.huawei.hwclouds.dbs.coremodel.model.dto.DBSInstanceDto;
import com.huawei.hwclouds.dbs.coremodel.model.dto.DBSNodeDto;
import com.huawei.hwclouds.dbs.coremodel.resource.dto.DBSResourceSpecDto;
import com.huawei.hwclouds.dbs.coremodel.resource.dto.DBSVolumeDto;
import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.text.SimpleDateFormat;
import java.util.Calendar;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.stream.Collectors;
public class KafkaToHdfs extends Thread {
    private static String kafkaHost = null;
    private static String kafkaGroup = null;
    private static String kafkaTopic = null;
    private static String hdfsUri = null;
    private static String hdfsDir = null;
    private static String hadoopUser = null;
    private static Boolean isDebug = false;
    private ConsumerConnector consumer = null;
    private static Configuration hdfsConf = null;
    private static FileSystem hadoopFS = null;
    public static void main(String[] args) {
//        if (args.length < 6) {
//            useage();
//            System.exit(0);
//        }
//        Map<String, String> user = new HashMap<String, String>();
//        user = System.getenv();
//        user.put("HADOOP_USER_NAME","hadoop");
//        if (user.get("HADOOP_USER_NAME") == null) {
//            System.out.println("請設定hadoop的啟動的用戶名,環(huán)境變量名稱:HADOOP_USER_NAME,對應的值是hadoop的啟動的用戶名");
//            System.exit(0);
//        } else {
//            hadoopUser = user.get("HADOOP_USER_NAME");
//        }
        hadoopUser = "hadoop";
        init(args);
        System.out.println("開始啟動服務...");
        hdfsConf = new Configuration();
        try {
            hdfsConf.set("fs.defaultFS", hdfsUri);
            hdfsConf.set("dfs.support.append", "true");
            hdfsConf.set("dfs.client.block.write.replace-datanode-on-failure.policy", "NEVER");
            hdfsConf.set("dfs.client.block.write.replace-datanode-on-failure.enable", "true");
        } catch (Exception e) {
            System.out.println(e);
        }
        //創(chuàng)建好相應的目錄
        try {
            hadoopFS = FileSystem.get(hdfsConf);
            //如果hdfs的對應的目錄不存在,則進行創(chuàng)建
            if (!hadoopFS.exists(new Path("/" + hdfsDir))) {
                hadoopFS.mkdirs(new Path("/" + hdfsDir));
            }
            hadoopFS.close();
        } catch (IOException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
        KafkaToHdfs selfObj = new KafkaToHdfs();
        selfObj.start();
        System.out.println("服務啟動完畢,監(jiān)聽執(zhí)行中");
    }
    public void run() {
        Properties props = new Properties();
        props.put("zookeeper.connect", kafkaHost);
        props.put("group.id", kafkaGroup);
        props.put("zookeeper.session.timeout.ms", "10000");
        props.put("zookeeper.sync.time.ms", "200");
        props.put("auto.commit.interval.ms", "1000");
        props.put("auto.offset.reset", "smallest");
        props.put("format", "binary");
        props.put("auto.commit.enable", "true");
        props.put("serializer.class", "kafka.serializer.StringEncoder");
        ConsumerConfig consumerConfig = new ConsumerConfig(props);
        this.consumer = Consumer.createJavaConsumerConnector(consumerConfig);
        Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
        topicCountMap.put(kafkaTopic, new Integer(1));
        Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
        KafkaStream<byte[], byte[]> stream = consumerMap.get(kafkaTopic).get(0);
        ConsumerIterator<byte[], byte[]> it = stream.iterator();
        while (it.hasNext()) {
            String tmp = new String(it.next().message());
            String fileContent = null;
            if (!tmp.endsWith("\n"))
                fileContent = new String(tmp + "\n");
            else
                fileContent = tmp;
            debug("receive: " + fileContent);
            try {
                hadoopFS = FileSystem.get(hdfsConf);
                String fileName = "/" + hdfsDir + "/" +
                        (new SimpleDateFormat("yyyy-MM-dd").format(Calendar.getInstance().getTime())) + ".txt";
                Path dst = new Path(fileName);
                if (!hadoopFS.exists(dst)) {
                    FSDataOutputStream output = hadoopFS.create(dst);
                    output.close();
                }
                InputStream in = new ByteArrayInputStream(fileContent.getBytes("UTF-8"));
                OutputStream out = hadoopFS.append(dst);
                IOUtils.copyBytes(in, out, 4096, true);
            } catch (IOException e) {
                e.printStackTrace();
            } finally {
                try {
                    hadoopFS.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
        consumer.shutdown();
    }
    private static void init(String[] args) {
        kafkaHost = "10.175.118.105:2182";
        kafkaGroup = "test-consumer-group";
        kafkaTopic = "test";
        hdfsUri = "hdfs://10.175.118.105:9000";
        hdfsDir = "shxsh";
        if (args.length > 5) {
            if (args[5].equals("true")) {
                isDebug = true;
            }
        }
        debug("初始化服務參數(shù)完畢,參數(shù)信息如下");
        debug("KAFKA_HOST: " + kafkaHost);
        debug("KAFKA_GROUP: " + kafkaGroup);
        debug("KAFKA_TOPIC: " + kafkaTopic);
        debug("HDFS_URI: " + hdfsUri);
        debug("HDFS_DIRECTORY: " + hdfsDir);
        debug("HADOOP_USER: " + hadoopUser);
        debug("IS_DEBUG: " + isDebug);
    }
    private static void debug(String str) {
        if (isDebug) {
            System.out.println(str);
        }
    }
    private static void useage() {
        System.out.println("* kafka寫入到hdfs的Java工具使用說明 ");
        System.out.println("# java -cp kafkatohdfs.jar KafkaToHdfs KAFKA_HOST KAFKA_GROUP KAFKA_TOPIC HDFS_URI HDFS_DIRECTORY IS_DEBUG");
        System.out.println("*  參數(shù)說明:");
        System.out.println("*   KAFKA_HOST      : 代表kafka的主機名或IP:port,例如:namenode:2181,datanode1:2181,datanode2:2181");
        System.out.println("*   KAFKA_GROUP     : 代表kafka的組,例如:test-consumer-group");
        System.out.println("*   KAFKA_TOPIC     : 代表kafka的topic名稱 ,例如:usertags");
        System.out.println("*   HDFS_URI        : 代表hdfs鏈接uri ,例如:hdfs://namenode:9000");
        System.out.println("*   HDFS_DIRECTORY  : 代表hdfs目錄名稱 ,例如:usertags");
        System.out.println("*  可選參數(shù):");
        System.out.println("*   IS_DEBUG        : 代表是否開啟調(diào)試模式,true是,false否,默認為false");
    }
}
 

總結(jié)

以上為個人經(jīng)驗,希望能給大家一個參考,也希望大家多多支持腳本之家。

相關文章

  • 一篇文章帶你了解JavaSE的數(shù)據(jù)類型

    一篇文章帶你了解JavaSE的數(shù)據(jù)類型

    這篇文章主要給大家介紹了關于JavaSE的數(shù)據(jù)類型,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面來一起學習學習吧
    2021-09-09
  • 使用JPA主鍵@Id,@IdClass,@Embeddable,@EmbeddedId問題

    使用JPA主鍵@Id,@IdClass,@Embeddable,@EmbeddedId問題

    這篇文章主要介紹了使用JPA主鍵@Id,@IdClass,@Embeddable,@EmbeddedId問題,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教
    2022-06-06
  • java工具類StringUtils使用實例詳解

    java工具類StringUtils使用實例詳解

    這篇文章主要為大家介紹了java工具類StringUtils使用實例詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪
    2023-05-05
  • 使用feign服務調(diào)用添加Header參數(shù)

    使用feign服務調(diào)用添加Header參數(shù)

    這篇文章主要介紹了使用feign服務調(diào)用添加Header參數(shù)的操作,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教
    2021-06-06
  • JAVA正則表達式過濾文件的實現(xiàn)方法

    JAVA正則表達式過濾文件的實現(xiàn)方法

    這篇文章主要介紹了JAVA正則表達式過濾文件的實現(xiàn)方法的相關資料,希望通過本文大家能夠掌握理解這部分內(nèi)容,需要的朋友可以參考下
    2017-09-09
  • 使用spring動態(tài)獲取接口的不同實現(xiàn)類

    使用spring動態(tài)獲取接口的不同實現(xiàn)類

    這篇文章主要介紹了使用spring動態(tài)獲取接口的不同實現(xiàn)類,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教
    2022-02-02
  • 淺談Spring Cloud Ribbon的原理

    淺談Spring Cloud Ribbon的原理

    這篇文章主要介紹了淺談Spring Cloud Ribbon的原理,小編覺得挺不錯的,現(xiàn)在分享給大家,也給大家做個參考。一起跟隨小編過來看看吧
    2018-02-02
  • spring boot(一)之入門篇

    spring boot(一)之入門篇

    Spring Boot是由Pivotal團隊提供的全新框架,其設計目的是用來簡化新Spring應用的初始搭建以及開發(fā)過程。接下來通過本文給大家介紹spring boot入門知識,需要的朋友參考下吧
    2017-05-05
  • Spring中的@Qualifier注解詳解

    Spring中的@Qualifier注解詳解

    這篇文章主要介紹了Spring中的@Qualifier注解詳解,spring?@Autowire?的注解默認是按類型注入bean,本文將對其使用方法進行說明,需要的朋友可以參考下
    2023-11-11
  • 最簡單的spring boot打包docker鏡像的實現(xiàn)

    最簡單的spring boot打包docker鏡像的實現(xiàn)

    這篇文章主要介紹了最簡單的spring boot打包docker鏡像的實現(xiàn),文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧
    2020-10-10

最新評論