go语言p2p搭建教程 go语言搭建个人博客-成都创新互联网站建设

关于创新互联

多方位宣传企业产品与服务 突出企业形象

公司简介 公司的服务 荣誉资质 新闻动态 联系我们

go语言p2p搭建教程 go语言搭建个人博客

golang p2p网

继续进入下一个初始化

成都创新互联公司专注为客户提供全方位的互联网综合服务,包含不限于成都网站设计、成都网站制作、思南网络推广、微信小程序开发、思南网络营销、思南企业策划、思南品牌公关、搜索引擎seo、人物专访、企业宣传片、企业代运营等,从售前售中售后,我们都将竭诚为您服务,您的肯定,是我们最大的嘉奖;成都创新互联公司为所有大学生创业者提供思南建站搭建服务,24小时服务热线:028-86922220,官方网址:www.cdcxhl.com

n.netService, err = nebnet.NewNebService(n)

if err != nil {

logging.CLog().WithFields(logrus.Fields{

"err": err,

}).Fatal("Failed to setup net service.")

}

netservice有两个成员

type NebServicestruct {

node      *Node

dispatcher *Dispatcher

}

跳出stup()函数

先进入start()函数看一看

if err := n.netService.Start(); err != nil {

logging.CLog().WithFields(logrus.Fields{

"err": err,

}).Fatal("Failed to start net service.")

}

进入netservice.start()

func (ns *NebService) Start() error {

logging.CLog().Info("Starting NebService...")

// start dispatcher.

ns.dispatcher.Start()

// start node.

if err := ns.node.Start(); err != nil {

ns.dispatcher.Stop()

logging.CLog().WithFields(logrus.Fields{

"err": err,

}).Error("Failed to start NebService.")

return err

}

logging.CLog().Info("Started NebService.")

return nil

}

可以看到第一个start()的函数是dispatcher.start()

进入dispatch.start()

func (dp *Dispatcher) Start() {

logging.CLog().Info("Starting NebService Dispatcher...")

go dp.loop()

}

然后就出现一个新的线程、goruntime

go dp.loop()

进入该线程,看它干了些什么

timerChan := time.NewTicker(time.Second).C

for {

select {

case -timerChan:

metricsDispatcherCached.Update(int64(len(dp.receivedMessageCh)))

case -dp.quitCh:

logging.CLog().Info("Stoped NebService Dispatcher.")

return

case msg := -dp.receivedMessageCh:

msgType := msg.MessageType()

v, _ := dp.subscribersMap.Load(msgType)

if v == nil {

continue

  }

m, _ := v.(*sync.Map)

m.Range(func(key, valueinterface{}) bool {

select {

case key.(*Subscriber).msgChan - msg:

default:

logging.VLog().WithFields(logrus.Fields{

"msgType": msgType,

}).Warn("timeout to dispatch message.")

}

return true

  })

}

}

一个有点长的循环

metricsDispatcherCached.Update(int64(len(dp.receivedMessageCh)))一秒钟刷新一次缓冲区

case msg := -dp.receivedMessageCh:

msgType := msg.MessageType()如果能取出dp.receivedMessageCh

msgType := msg.MessageType()首先判断取出的信息类型

v, _ := dp.subscribersMap.Load(msgType)

if v == nil {

continue

}

根据类型取出相应的map

如果取不出,那么使用continue结束这个case

m, _ := v.(*sync.Map)

断言

m.Range(func(key, valueinterface{}) bool {

select {

case key.(*Subscriber).msgChan - msg:

default:

logging.VLog().WithFields(logrus.Fields{

"msgType": msgType,

}).Warn("timeout to dispa+tch message.")

}

return true

})

将msg推入其他管道里面去。其他goruntime会循环等待该

如何搭建go语言环境 linux

Go的三种安装方式

Go有多种安装方式,你可以选择自己喜欢的。这里我们介绍三种最常见的安装方式:

Go源码安装:这是一种标准的软件安装方式。对于经常使用Unix类系统的用户,尤其对于开发者来说,从源码安装可以自己定制。

Go标准包安装:Go提供了方便的安装包,支持Windows、Linux、Mac等系统。这种方式适合快速安装,可根据自己的系统位数下载好相应的安装包,一路next就可以轻松安装了。**推荐这种方式**

第三方工具安装:目前有很多方便的第三方软件包工具,例如Ubuntu的apt-get、Mac的homebrew等。这种安装方式适合那些熟悉相应系统的用户。

最后,如果你想在同一个系统中安装多个版本的Go,你可以参考第三方工具GVM,这是目前在这方面做得最好的工具,除非你知道怎么处理。

Go源码安装

在Go的源代码中,有些部分是用Plan 9 C和ATT汇编写的,因此假如你要想从源码安装,就必须安装C的编译工具。

在Mac系统中,只要你安装了Xcode,就已经包含了相应的编译工具。

在类Unix系统中,需要安装gcc等工具。例如Ubuntu系统可通过在终端中执行sudo apt-get install gcc

libc6-dev来安装编译工具。

在Windows系统中,你需要安装MinGW,然后通过MinGW安装gcc,并设置相应的环境变量。

你可以直接去官网下载源码,找相应的goVERSION.src.tar.gz的文件下载,下载之后解压缩到$HOME目录,执行如下代码:

cd go/src

./all.bash

运行all.bash后出现"ALL TESTS PASSED"字样时才算安装成功。

上面是Unix风格的命令,Windows下的安装方式类似,只不过是运行all.bat,调用的编译器是MinGW的gcc。

如果是Mac或者Unix用户需要设置几个环境变量,如果想重启之后也能生效的话把下面的命令写到.bashrc或者.zshrc里面,

export GOPATH=$HOME/gopath

export PATH=$PATH:$HOME/go/bin:$GOPATH/bin

如果你是写入文件的,记得执行bash .bashrc或者bash

.zshrc使得设置立马生效。

如果是window系统,就需要设置环境变量,在path里面增加相应的go所在的目录,设置gopath变量。

当你设置完毕之后在命令行里面输入go,看到如下图片即说明你已经安装成功

图1.1 源码安装之后执行Go命令的图

如果出现Go的Usage信息,那么说明Go已经安装成功了;如果出现该命令不存在,那么可以检查一下自己的PATH环境变中是否包含了Go的安装目录。

关于上面的GOPATH将在下面小节详细讲解

Go标准包安装

Go提供了每个平台打好包的一键安装,这些包默认会安装到如下目录:/usr/local/go

(Windows系统:c:\Go),当然你可以改变他们的安装位置,但是改变之后你必须在你的环境变量中设置如下信息:

export GOROOT=$HOME/go

export GOPATH=$HOME/gopath

export PATH=$PATH:$GOROOT/bin:$GOPATH/bin

上面这些命令对于Mac和Unix用户来说最好是写入.bashrc或者.zshrc文件,对于windows用户来说当然是写入环境变量。

IPFS(四) 源码解读之-p2p

package p2p

import (

"context"

"errors"

"time"

net "gx/ipfs/QmPjvxTpVH8qJyQDnxnsxF9kv9jezKD1kozz1hs3fCGsNh/go-libp2p-net"

manet "gx/ipfs/QmV6FjemM1K8oXjrvuq3wuVWWoU2TLDPmNnKrxHzY3v6Ai/go-multiaddr-net"

ma "gx/ipfs/QmYmsdtJ3HsodkePE3eU3TsCaP2YvPZJ4LoXnNkDE5Tpt7/go-multiaddr"

pro "gx/ipfs/QmZNkThpqfVXs9GNbexPrfBbXSLNYeKrE7jwFM2oqHbyqN/go-libp2p-protocol"

pstore "gx/ipfs/QmZR2XWVVBCtbgBWnQhWk2xcQfaR3W8faQPriAiaaj7rsr/go-libp2p-peerstore"

p2phost "gx/ipfs/Qmb8T6YBBsjYsVGfrihQLfCJveczZnneSBqBKkYEBWDjge/go-libp2p-host"

peer "gx/ipfs/QmdVrMn1LhB4ybb8hMVaMLXnA8XRSewMnK6YqXKXoTcRvN/go-libp2p-peer"

)

//P2P结构保存当前正在运行的流/监听器的信息

// P2P structure holds information on currently running streams/listeners

type P2P struct {

//监听器

Listeners ListenerRegistry

//数据流

Streams StreamRegistry

//节点ID

identity peer.ID

//节点地址

peerHost p2phost.Host

//一个线程安全的对等节点存储

peerstore pstore.Peerstore

}

//创建一个新的p2p结构

// NewP2P creates new P2P struct

//这个新的p2p结构不包含p2p结构中的监听器和数据流

func NewP2P(identity peer.ID, peerHost p2phost.Host, peerstore pstore.Peerstore) *P2P {

return P2P{

identity: identity,

peerHost: peerHost,

peerstore: peerstore,

}

}

//新建一个数据流 工具方法 构建一个有节点id,内容和协议的流

func (p2p P2P) newStreamTo(ctx2 context.Context, p peer.ID, protocol string) (net.Stream, error) {

//30s 后会自动timeout

ctx, cancel := context.WithTimeout(ctx2, time.Second 30) //TODO: configurable?

defer cancel()

err := p2p.peerHost.Connect(ctx, pstore.PeerInfo{ID: p})

if err != nil {

return nil, err

}

return p2p.peerHost.NewStream(ctx2, p, pro.ID(protocol))

}

//对话为远程监听器创建新的P2P流

//创建一个新的p2p流实现对对话的监听

// Dial creates new P2P stream to a remote listener

//Multiaddr是一种跨协议、跨平台的表示格式的互联网地址。它强调明确性和自我描述。

//对内接收

func (p2p P2P) Dial(ctx context.Context, addr ma.Multiaddr, peer peer.ID, proto string, bindAddr ma.Multiaddr) ( ListenerInfo, error) {

//获取一些节点信息 network, host, nil

lnet, _, err := manet.DialArgs(bindAddr)

if err != nil {

return nil, err

}

//监听信息

listenerInfo := ListenerInfo{

//节点身份

Identity: p2p.identity,

////应用程序协议标识符。

Protocol: proto,

}

//调用newStreamTo 通过ctx(内容) peer(节点id) proto(协议标识符) 参数获取一个新的数据流

remote, err := p2p.newStreamTo(ctx, peer, proto)

if err != nil {

return nil, err

}

//network协议标识

switch lnet {

//network为"tcp", "tcp4", "tcp6"

case "tcp", "tcp4", "tcp6":

//从监听器获取新的信息 nla.Listener, nil

listener, err := manet.Listen(bindAddr)

if err != nil {

if err2 := remote.Reset(); err2 != nil {

return nil, err2

}

return nil, err

}

//将获取的新信息保存到listenerInfo

listenerInfo.Address = listener.Multiaddr()

listenerInfo.Closer = listener

listenerInfo.Running = true

//开启接受

go p2p.doAccept(listenerInfo, remote, listener)

default:

return nil, errors.New("unsupported protocol: " + lnet)

}

return listenerInfo, nil

}

//

func (p2p *P2P) doAccept(listenerInfo *ListenerInfo, remote net.Stream, listener manet.Listener) {

//关闭侦听器并删除流处理程序

defer listener.Close()

//Returns a Multiaddr friendly Conn

//一个有好的 Multiaddr 连接

local, err := listener.Accept()

if err != nil {

return

}

stream := StreamInfo{

//连接协议

Protocol: listenerInfo.Protocol,

//定位节点

LocalPeer: listenerInfo.Identity,

//定位节点地址

LocalAddr: listenerInfo.Address,

//远程节点

RemotePeer: remote.Conn().RemotePeer(),

//远程节点地址

RemoteAddr: remote.Conn().RemoteMultiaddr(),

//定位

Local: local,

//远程

Remote: remote,

//注册码

Registry: p2p.Streams,

}

//注册连接信息

p2p.Streams.Register(stream)

//开启节点广播

stream.startStreaming()

}

//侦听器将流处理程序包装到侦听器中

// Listener wraps stream handler into a listener

type Listener interface {

Accept() (net.Stream, error)

Close() error

}

//P2PListener保存关于侦听器的信息

// P2PListener holds information on a listener

type P2PListener struct {

peerHost p2phost.Host

conCh chan net.Stream

proto pro.ID

ctx context.Context

cancel func()

}

//等待侦听器的连接

// Accept waits for a connection from the listener

func (il *P2PListener) Accept() (net.Stream, error) {

select {

case c := -il.conCh:

return c, nil

case -il.ctx.Done():

return nil, il.ctx.Err()

}

}

//关闭侦听器并删除流处理程序

// Close closes the listener and removes stream handler

func (il *P2PListener) Close() error {

il.cancel()

il.peerHost.RemoveStreamHandler(il.proto)

return nil

}

// Listen创建新的P2PListener

// Listen creates new P2PListener

func (p2p P2P) registerStreamHandler(ctx2 context.Context, protocol string) ( P2PListener, error) {

ctx, cancel := context.WithCancel(ctx2)

list := P2PListener{

peerHost: p2p.peerHost,

proto: pro.ID(protocol),

conCh: make(chan net.Stream),

ctx: ctx,

cancel: cancel,

}

p2p.peerHost.SetStreamHandler(list.proto, func(s net.Stream) {

select {

case list.conCh - s:

case -ctx.Done():

s.Reset()

}

})

return list, nil

}

// NewListener创建新的p2p侦听器

// NewListener creates new p2p listener

//对外广播

func (p2p P2P) NewListener(ctx context.Context, proto string, addr ma.Multiaddr) ( ListenerInfo, error) {

//调用registerStreamHandler 构造一个新的listener

listener, err := p2p.registerStreamHandler(ctx, proto)

if err != nil {

return nil, err

}

//构造新的listenerInfo

listenerInfo := ListenerInfo{

Identity: p2p.identity,

Protocol: proto,

Address: addr,

Closer: listener,

Running: true,

Registry: p2p.Listeners,

}

go p2p.acceptStreams(listenerInfo, listener)

//注册连接信息

p2p.Listeners.Register(listenerInfo)

return listenerInfo, nil

}

//接受流

func (p2p *P2P) acceptStreams(listenerInfo *ListenerInfo, listener Listener) {

for listenerInfo.Running {

//一个有好的 远程 连接

remote, err := listener.Accept()

if err != nil {

listener.Close()

break

}

}

//取消注册表中的p2p侦听器

p2p.Listeners.Deregister(listenerInfo.Protocol)

}

// CheckProtoExists检查是否注册了协议处理程序

// mux处理程序

// CheckProtoExists checks whether a protocol handler is registered to

// mux handler

func (p2p *P2P) CheckProtoExists(proto string) bool {

protos := p2p.peerHost.Mux().Protocols()

for _, p := range protos {

if p != proto {

continue

}

return true

}

return false

}


网站栏目:go语言p2p搭建教程 go语言搭建个人博客
分享URL:http://kswsj.cn/article/dohdpod.html

其他资讯