datanode.java

来自「Hadoop是一个用于运行应用程序在大型集群的廉价硬件设备上的框架。Hadoop」· Java 代码 · 共 788 行 · 第 1/3 页

JAVA
788
字号
                    }                }            }        }    }    /**     * Server used for receiving/sending a block of data.     * This is created to listen for requests from clients or      * other DataNodes.  This small server does not use the      * Hadoop IPC mechanism.     */    class DataXceiveServer implements Runnable {        boolean shouldListen = true;        ServerSocket ss;        public DataXceiveServer(ServerSocket ss) {            this.ss = ss;        }        /**         */        public void run() {            try {                while (shouldListen) {                    Socket s = ss.accept();                    //s.setSoTimeout(READ_TIMEOUT);                    new Daemon(new DataXceiver(s)).start();                }                ss.close();            } catch (IOException ie) {                LOG.info("Exiting DataXceiveServer due to " + ie.toString());            }        }        public void kill() {            this.shouldListen = false;            try {                this.ss.close();            } catch (IOException iex) {            }        }    }    /**     * Thread for processing incoming/outgoing data stream     */    class DataXceiver implements Runnable {        Socket s;        public DataXceiver(Socket s) {            this.s = s;        }        /**         * Read/write data from/to the DataXceiveServer.         */        public void run() {            try {                DataInputStream in = new DataInputStream(new BufferedInputStream(s.getInputStream()));                try {                    byte op = (byte) in.read();                    if (op == OP_WRITE_BLOCK) {                        //                        // Read in the header                        //                        DataOutputStream reply = new DataOutputStream(new BufferedOutputStream(s.getOutputStream()));                        try {                            boolean shouldReportBlock = in.readBoolean();                            Block b = new Block();                            b.readFields(in);                            int numTargets = in.readInt();                            if (numTargets <= 0) {                                throw new IOException("Mislabelled incoming datastream.");                            }                            DatanodeInfo targets[] = new DatanodeInfo[numTargets];                            for (int i = 0; i < targets.length; i++) {                                DatanodeInfo tmp = new DatanodeInfo();                                tmp.readFields(in);                                targets[i] = tmp;                            }                            byte encodingType = (byte) in.read();                            long len = in.readLong();                            //                            // Make sure curTarget is equal to this machine                            //                            DatanodeInfo curTarget = targets[0];                            //                            // Track all the places we've successfully written the block                            //                            Vector mirrors = new Vector();                            //                            // Open local disk out                            //                            DataOutputStream out = new DataOutputStream(new BufferedOutputStream(data.writeToBlock(b)));                            InetSocketAddress mirrorTarget = null;                            try {                                //                                // Open network conn to backup machine, if                                 // appropriate                                //                                DataInputStream in2 = null;                                DataOutputStream out2 = null;                                if (targets.length > 1) {                                    // Connect to backup machine                                    mirrorTarget = createSocketAddr(targets[1].getName().toString());                                    try {                                        Socket s2 = new Socket();                                        s2.connect(mirrorTarget, READ_TIMEOUT);                                        s2.setSoTimeout(READ_TIMEOUT);                                        out2 = new DataOutputStream(new BufferedOutputStream(s2.getOutputStream()));                                        in2 = new DataInputStream(new BufferedInputStream(s2.getInputStream()));                                        // Write connection header                                        out2.write(OP_WRITE_BLOCK);                                        out2.writeBoolean(shouldReportBlock);                                        b.write(out2);                                        out2.writeInt(targets.length - 1);                                        for (int i = 1; i < targets.length; i++) {                                            targets[i].write(out2);                                        }                                        out2.write(encodingType);                                        out2.writeLong(len);                                    } catch (IOException ie) {                                        if (out2 != null) {                                            try {                                                out2.close();                                                in2.close();                                            } catch (IOException out2close) {                                            } finally {                                                out2 = null;                                                in2 = null;                                            }                                        }                                    }                                }                                //                                // Process incoming data, copy to disk and                                // maybe to network.                                //                                try {                                    boolean anotherChunk = len != 0;                                    byte buf[] = new byte[BUFFER_SIZE];                                    while (anotherChunk) {                                        while (len > 0) {                                            int bytesRead = in.read(buf, 0, (int)Math.min(buf.length, len));                                            if (bytesRead < 0) {                                              throw new EOFException("EOF reading from "+s.toString());                                            }                                            if (bytesRead > 0) {                                                try {                                                    out.write(buf, 0, bytesRead);                                                } catch (IOException iex) {                                                    shutdown();                                                    throw iex;                                                }                                                if (out2 != null) {                                                    try {                                                        out2.write(buf, 0, bytesRead);                                                    } catch (IOException out2e) {                                                        //                                                        // If stream-copy fails, continue                                                         // writing to disk.  We shouldn't                                                         // interrupt client write.                                                        //                                                        try {                                                            out2.close();                                                            in2.close();                                                        } catch (IOException out2close) {                                                        } finally {                                                            out2 = null;                                                            in2 = null;                                                        }                                                    }                                                }                                                len -= bytesRead;                                            }                                        }                                        if (encodingType == RUNLENGTH_ENCODING) {                                            anotherChunk = false;                                        } else if (encodingType == CHUNKED_ENCODING) {                                            len = in.readLong();                                            if (out2 != null) {                                                out2.writeLong(len);                                            }                                            if (len == 0) {                                                anotherChunk = false;                                            }                                        }                                    }                                    if (out2 == null) {                                        LOG.info("Received block " + b + " from " + s.getInetAddress());                                    } else {                                        out2.flush();                                        long complete = in2.readLong();                                        if (complete != WRITE_COMPLETE) {                                            LOG.info("Conflicting value for WRITE_COMPLETE: " + complete);                                        }                                        LocatedBlock newLB = new LocatedBlock();                                        newLB.readFields(in2);                                        DatanodeInfo mirrorsSoFar[] = newLB.getLocations();                                        for (int k = 0; k < mirrorsSoFar.length; k++) {                                            mirrors.add(mirrorsSoFar[k]);                                        }                                        LOG.info("Received block " + b + " from " + s.getInetAddress() + " and mirrored to " + mirrorTarget);                                    }                                } finally {                                    if (out2 != null) {                                        out2.close();                                        in2.close();                                    }                                }                            } finally {                                try {                                    out.close();                                } catch (IOException iex) {                                    shutdown();                                    throw iex;                                }                            }                            data.finalizeBlock(b);                            //                             // Tell the namenode that we've received this block                             // in full, if we've been asked to.  This is done                            // during NameNode-directed block transfers, but not                            // client writes.                            //                            if (shouldReportBlock) {                                synchronized (receivedBlockList) {                                    receivedBlockList.add(b);                                    receivedBlockList.notifyAll();                                }                            }                            //                            // Tell client job is done, and reply with                            // the new LocatedBlock.                            //                            reply.writeLong(WRITE_COMPLETE);                            mirrors.add(curTarget);                            LocatedBlock newLB = new LocatedBlock(b, (DatanodeInfo[]) mirrors.toArray(new DatanodeInfo[mirrors.size()]));                            newLB.write(reply);                        } finally {                            reply.close();                        }                    } else if (op == OP_READ_BLOCK || op == OP_READSKIP_BLOCK) {                        //                        // Read in the header                        //                        Block b = new Block();                        b.readFields(in);                        long toSkip = 0;                        if (op == OP_READSKIP_BLOCK) {                            toSkip = in.readLong();                        }                        //

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?