精华内容
参与话题
问答
  • Channel

    千次阅读 2020-06-28 17:07:26
    goroutine之间的通信使用channel。数据传送是阻塞式的,发了数据之后必须有人来收数据。 func chanDemo() { //var c chan int // c == nil c := make(chan int) go func() { //这里的匿名函数相当于闭包,引用了...

    goroutine之间的通信使用channel。数据传送是阻塞式的,发了数据之后必须有人来收数据。

    func chanDemo() {
    	//var c chan int // c == nil
    	c := make(chan int)
    	go func() {  //这里的匿名函数相当于闭包,引用了外面的c变量
    		for {
    			n := <-c //开了一个goroutine去接数据
    			fmt.Println(n)
    		}
    	}()
    	c <- 1 //往channel中发生数据
    	c <- 2
    	time.Sleep(time.Millisecond)
    
    }
    
    func main() {
    	chanDemo()
    }
    
    

    channel作为参数

    func woker(id int, c chan int) {
    	for {
    		fmt.Printf("Worker %d received %c\n", id, <-c) //因为协程是主动式非抢占,在遇到I/O操作时,会进行调度
    	}
    }
    
    func chanDemo() {
    	var channels [10]chan int
    	for i := 0; i < 10; i++ {
    		channels[i] = make(chan int)
    		go woker(i, channels[i])
    	}
    
    	for i := 0; i < 10; i++ {
    		channels[i] <- 'a' + i
    	}
    	for i := 0; i < 10; i++ {
    		channels[i] <- 'A' + i
    	}
    	time.Sleep(time.Millisecond) //防止main函数先退出
    
    }
    
    func main() {
    	chanDemo()
    }
    
    

    channel作为返回值

    func createWorker(id int) chan<- int { //返回一个只能往里写数据的channel
    	c := make(chan int) //创建一个可以写数据和读数据的channnel
    	go func() {
    		for {
    			fmt.Printf("Worker %d received %c\n", id, <-c)
    		}
    	}()
    	return c
    }
    
    func chanDemo() {
    	var channels [10]chan<- int //只能往channel中写数据
    	for i := 0; i < 10; i++ {
    		//channels[i] = make(chan int)
    		//go woker(i, channels[i])
    		channels[i] = createWorker(i)
    	}
    
    	for i := 0; i < 10; i++ {
    		channels[i] <- 'a' + i
    	}
    	for i := 0; i < 10; i++ {
    		channels[i] <- 'A' + i
    	}
    	time.Sleep(time.Millisecond)
    
    }
    

    bufferchannel

    func woker(id int, c chan int) {
    	for {
    		fmt.Printf("Worker %d received %c\n", id, <-c) //因为协程是主动式非抢占,在遇到I/O操作时,会进行调度
    	}
    }
    func bufferedChannel() {
    	c := make(chan int, 3) //创建缓冲区为3的channel
    
    	go woker(0, c)
    	//只要有人发数据,就必须有人来接数据。缓冲区为3说明 发送的数据小于3时,若没人来接数据,不会出现错误
    	c <- 'a'
    	c <- 'b'
    	c <- 'c'
    	c <- 'd'
    	time.Sleep(time.Millisecond)
    }
    

    channel是可以被发送方进行关闭的,接收方使用两种方法进行判断。

    func woker(id int, c chan int) {
    	for {
    	//当发送方关闭channel之后,接收方还会进行接收,值为channel对应类型的默认值。因此在这里进行判断是否channel关闭
    		if n, ok := <-c; ok {
    			fmt.Printf("Worker %d received %c\n", id, n) //因为协程是主动式非抢占,在遇到I/O操作时,会进行调度
    		} else {
    			break
    		}
    	}
    	//第二种方法
    	//for n := range c {
    	//	fmt.Printf("Worker %d received %c\n", id, n)
    	//}
    }
    func channelClosed() {
    	c := make(chan int, 3) //创建缓冲区为3的channel
    
    	go woker(0, c)
    	c <- 'a'
    	c <- 'b'
    	c <- 'c'
    	c <- 'd'
    	close(c) 
    	time.Sleep(time.Millisecond)
    }
    

    不要通过共享内存来通信,通过通信来共享内存。
    使用channel等待任务结束。两种方法:使用channel进行传递信息;使用sync.WaitGroup类型的变量。

    	data := make(chan int)
    	exit := make(chan bool)
    	go func() {
    		for d := range data {
    			fmt.Println(d)
    		}
    		fmt.Println("recv over.")
    		exit <- true
    	}()
    
    	data <- 1
    	data <- 2
    	data <- 3
    	close(data)
    	fmt.Println("send over.")
    	<-exit
    
    func doWoker(id int, c chan int, done chan bool) {
    	for n := range c {
    		fmt.Printf("Worker %d received %c\n", id, n)
    		//done <- true //往done中发送了数据,在外面必须有人来接收数据
    		go func() {
    			done <- true
    		}()
    	}
    }
    
    type worker struct {
    	in   chan int
    	done chan bool
    }
    
    func createWorker(id int) worker {
    	w := worker{
    		in:   make(chan int),
    		done: make(chan bool),
    	}
    	go doWoker(id, w.in, w.done)
    	return w
    }
    
    func chanDemo() {
    	var workers [10]worker
    	for i := 0; i < 10; i++ {
    		workers[i] = createWorker(i)
    	}
    
    	for i, worker := range workers {
    		worker.in <- 'a' + i
    		//<-workers[i].done //只有收到了数据说明打印已经完成了,才会往下执行
    	}
    	for i, worker := range workers {
    		worker.in <- 'A' + i
    		//<-workers[i].done
    	}
    
    	//将20个数据全部发出去,然后再进行等待
    	for _, worker := range workers {
    		<-worker.done
    		<-worker.done
    	}
    
    }
    
    
    func doWoker(id int, w worker) {
    	for n := range w.in {
    		fmt.Printf("Worker %d received %c\n", id, n)
    		w.done()
    	}
    }
    
    type worker struct {
    	in   chan int
    	done func()
    }
    
    func createWorker(id int, wg *sync.WaitGroup) worker {
    	w := worker{
    		in: make(chan int),
    		done: func() {
    			wg.Done() //完成一次任务调用一次Done()
    		},
    	}
    	go doWoker(id, w)
    	return w
    }
    
    func chanDemo() {
    	var workers [10]worker
    	var wq sync.WaitGroup
    
    	for i := 0; i < 10; i++ {
    		workers[i] = createWorker(i, &wq)
    	}
    
    	wq.Add(20) //添加20个任务
    	for i, worker := range workers {
    		worker.in <- 'a' + i
    
    	}
    	for i, worker := range workers {
    		worker.in <- 'A' + i
    	}
    	wq.Wait()
    }
    
    

    使用select进行调度。

    
    func generator() chan int {
    	out := make(chan int)
    	go func() {
    		i := 0
    		for {
    			time.Sleep(
    				time.Duration(rand.Intn(1500)) *
    					time.Millisecond)
    			out <- i
    			i++
    		}
    	}()
    	return out
    }
    func woker(id int, c chan int) {
    	for n := range c {
    		time.Sleep(time.Second)
    		fmt.Printf("Worker %d received %d\n", id, n)
    	}
    }
    func createWorker(id int) chan<- int {
    	c := make(chan int)
    	go woker(id, c)
    	return c
    }
    
    func main() {
    	//从c1, c2中读出数据 写入worker中
    	var c1, c2 = generator(), generator()
    	worker := createWorker(0)
    
    	var values []int //生成和消耗的速度不一样,需要存储接收到的数据
    
    	tm := time.After(10 * time.Second) //返回的是一个channel,10s后往这个channel中发生一个时间
    	tick := time.Tick(time.Second)
    	for {
    		var activeWorker chan<- int // activeWorker 是 nil,在select虽然不会运行错误,但是永远不是被select到
    		var activeValue int
    		if len(values) > 0 {
    			activeWorker = worker
    			activeValue = values[0]
    		}
    
    		select {
    		case n := <-c1:
    			values = append(values, n)
    		case n := <-c2:
    			values = append(values, n)
    		case activeWorker <- activeValue:
    			values = values[1:]
    		case <-time.After(800 * time.Millisecond): //如果两次生成数据的时间大于800ms,则会timeout
    			fmt.Println("Time out")
    		case <-tick: //每1s查看一下数据的长度
    			fmt.Println("queue lens = ", len(values))
    		case <-tm:
    			fmt.Println("Bye")
    			return
    		}
    	}
    }
    

    传统的同步机制:WaitGroup,mutex,conditional variable

    type atomicInt struct {
    	value int
    	m     sync.Mutex
    }
    
    func (a *atomicInt) increasement() {
    	fmt.Println("safe increasement")
    	func() {
    		a.m.Lock()
    		defer a.m.Unlock()
    		a.value++
    	}()
    
    }
    
    func (a *atomicInt) get() int {
    	a.m.Lock()
    	defer a.m.Unlock()
    	return a.value
    }
    

    select语句的用法


    通过select可以监听channel上的数据流动。
    每一个case语句里必须是一个I/O操作。

    elect {
    
          case <-chan1:
    
            // 如果chan1成功读到数据,则进行该case处理语句
    
          case chan2 <- 1:
    
            // 如果成功向chan2写入数据,则进行该case处理语句
    
          default:
    
            // 如果上面都没有成功,则进入default处理流程
    
        }
    

    在一个select语句中,Go语言会按顺序从头至尾评估每一个发送和接收的语句。

    如果其中的任意一语句可以继续执行(即没有被阻塞),那么就从那些可以执行的语句中任意选择一条来使用。

    如果没有任意一条语句可以执行(即所有的通道都被阻塞),那么有两种可能的情况:

    • 如果给出了default语句,那么就会执行default语句,同时程序的执行会从select语句后的语句中恢复。

    • 如果没有default语句,那么select语句将被阻塞,直到至少有一个通信可以进行下去。

    go的CSP模型


    go语言支持两种形式的并发:多线程共享内存;CSP并发模型。

    多线程共享内存模型:在访问数据的时候,通过锁来访问。

    CSP并发模型:通过channel和goroutine实现。goroutine是Go中并发的执行单位,channel是各个并发单位之前的通信机制,通俗来说就是各个goroutine之间的管道

    传数据用channel <- data,取数据用<-channel,在通信过程中,传数据channel <- data和取数据<-channel必然会成对出现,而且不管传还是取,必阻塞,直到另外的goroutine传或者取为止。

    参考一

    参考二

    展开全文
  • channel

    2018-07-30 18:45:31
    1.向channel写入数据通常会导致程序阻塞,直到有其他的goroutine从这个channel读取数据。 2.如果channel没有数据,从channel读取数据也会导致程序阻塞。 声明channel var chanName chan ElementType 定义channel...

    channel

    • channel是一个数据类型,主要用来解决go程的同步问题以及go程之间数据共享(数据传递)的问题。
    • 源码在src/runtime/chan.go
    channel特性
    1. channel是阻塞式的
    2. 管道内的数据只能单向流动
    3. 管道中的数据,一旦被读走,不能重复读取。只能读一次。
    4. 数据在管道中正确流动,要求读、写两端同时在线。
    定义一个chan int类型的变量
    • var ch chan int = make(chan int)
    • channel是一个引用类型,make函数返回一个引用。
    无缓冲的channel和带缓冲的channel
    1. 无缓冲channel
    • 同步通信
    • 没有缓冲区不能缓冲任何数据。有读没有写,读端阻塞。有写没有读,写端阻塞。
    1. 有缓冲channel
    • var chs chan int = make(chan int, 10)
    • 异步通信
    • channel可以根据缓冲区大小,存储适量的数据。
    • 带缓冲区的channel,可以用len(),cap()查看其当前拥有的数据长度,和容量

    channel用法

    读channel:
    <- ch //自动舍弃读到的数据
    num := <- ch  //读到的数据保存到num
    
    写channel:
    ch <- var  //写
    ch <- 10
    ch <- "你好"
    
    使用range关键字读取ch

    循环读取channel,当channel close时,循环结束。

    for i := range ch {
    	fmt.Println(i)
    }
    
    • channel关闭时,循环结束
    单向channel
    var ch1 chan<- int  //只写
    var ch2 <- chan int  //只读
    
    双向channel可以转换成单向channel
    ch1 := make(chan int)
    ch2 := <-chan int(ch1)  //ch2是单向读取
    ch3 := chan<- int(ch1)  //ch3是单向写入
    
    关闭channel
    close(ch)
    
    检查channel关闭

    num, isOk := <- ch

    • 检查channel关闭并循环读取:
    for {
      if num, isOk := <- ch; isOk == true {
        fmt.Println(num)
      } else if {
        break
      }
    }
    

    关闭有缓冲channel

    当关闭有缓冲channel时,若管道里还有数据。读端依然可以读取数据,并且管道为空时,读端才会感知到管道关闭。

    channel 和 panic

    读写一个关闭的channel,或多次关闭同一个channel,会报出panic异常,一种处理方法是直接捕获

    defer func() {
    	if info := recover(); info != nil {
    		fmt.Println(info)
    	}
    }
    
    展开全文
  • NIO中的Channel的使用

    万次阅读 2020-01-04 23:41:44
    通道的主要实现类 import java.nio.channels.DatagramChannel; import java.nio.channels.FileChannel; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel;...

    通道的主要实现类

    import java.nio.channels.DatagramChannel;
    import java.nio.channels.FileChannel;
    import java.nio.channels.ServerSocketChannel;
    import java.nio.channels.SocketChannel;

     

    获取通道的方式

    1. JAVA针对支持通道的类提供了getChannel()方法
      本地IO:
            
      import java.io.FileInputStream;
      import java.io.FileOutputStream;
      import java.io.RandomAccessFile;

      网络IO:

      import java.net.ServerSocket;
      import java.net.Socket;
      import java.nio.channels.DatagramChannel;
    2. jdk1.7以后NIO.2 针对各个通道提供了静态方法open()
    3. jdk1.7中NIO.w的Files工具类的newByteChannel()

    案例一:使用 通道完成文件的复制

    1. 非直接缓冲区:
      import org.junit.jupiter.api.Test;
      
      import java.io.FileInputStream;
      import java.io.FileOutputStream;
      import java.io.IOException;
      import java.nio.ByteBuffer;
      import java.nio.channels.FileChannel;
      
      
      public class ChannelTest {
      
          @Test
          public void channel() throws IOException {
      
              FileInputStream fis=new FileInputStream("文件1的路径");
              FileOutputStream fos=new FileOutputStream("文件2的路径");
      
              //1.获取通道
              FileChannel inChannel=fis.getChannel();
              FileChannel outChannel=fos.getChannel();
      
              //2.分配指定大小的缓冲区
              ByteBuffer buffer=ByteBuffer.allocate(1024);
      
              //3.将通道中的数据存入缓冲区中
              while (inChannel.read(buffer)!=-1){
                  buffer.flip();//切换读取数据的模式
                  outChannel.write(buffer);//写入数据
                  buffer.clear();//清空缓冲去
              }
              //关闭流和通道
              inChannel.close();
              outChannel.close();
              fis.close();
              fos.close();
          }
      }
      
    2. 直接缓冲区
      import org.junit.jupiter.api.Test;
      
      import java.io.IOException;
      import java.nio.MappedByteBuffer;
      import java.nio.channels.FileChannel;
      import java.nio.file.Paths;
      import java.nio.file.StandardOpenOption;
      
      
      public class ChannelTest {
          @Test
          public void channel2() throws IOException {
              FileChannel inchannel=FileChannel.open(Paths.get("文件1的路径"), StandardOpenOption.READ);
              //CREATE_NEW  如果存在则报错,如果不存在则报错。READ只读模式。WRITE存在则覆盖,不存在则创建
              FileChannel outchannel=FileChannel.open(Paths.get("文件2的路径"),StandardOpenOption.WRITE,StandardOpenOption.CREATE_NEW);
      
              //内存映射文件
              MappedByteBuffer inMappedBuf=inchannel.map(FileChannel.MapMode.READ_ONLY,0,inchannel.size());
              MappedByteBuffer outMappedBuf=inchannel.map(FileChannel.MapMode.READ_WRITE,0,inchannel.size());
      
              //对缓冲区进行数据的读写操作
              byte[] dst=new byte[inMappedBuf.limit()];
              inMappedBuf.get(dst);
              outMappedBuf.put(dst);
      
              inchannel.close();
              outchannel.close();
      
          }
      }

      最简单的方式:
      transferTo传输
      transferFrom传输(两者的区别只是来回之间的问题)

      import org.junit.jupiter.api.Test;
      
      import java.io.IOException;
      import java.nio.channels.FileChannel;
      import java.nio.file.Paths;
      import java.nio.file.StandardOpenOption;
      
      
      public class ChannelTest {
      
          @Test
          public void channel3() throws IOException {
              //读取文件
              FileChannel inchannel=FileChannel.open(Paths.get("文件1的位置"),StandardOpenOption.READ);
              //目标文件的位置
              FileChannel outchannel=FileChannel.open(Paths.get("文件2的位置"),StandardOpenOption.CREATE_NEW);
      
              inchannel.transferTo(0,inchannel.size(),outchannel);
              
              inchannel.close();
              outchannel.close();
      
          }
      }

       

     

     

    展开全文
  • 文章目录Channel shutdown: channel error; protocol method: #methodChannel shutdown: channel error; protocol method: #method<channel.close>(rep 1、启动springboot 应用报错 Channel shutdown: ...

    Channel shutdown: channel error; protocol method: #method<channel.close>(rep


    1、启动springboot 应用报错

    Channel shutdown: channel error; protocol method: #method<channel.close>(rep

    2、原因

    当应用启动时,spring 会去检查注册的队列,跟服务器上的队列配置是否一致,如果不一致,则抛出这个错误

    比如你在项目中的配置是

        @Bean(DEAD_LETTER_PROD_CLOSE_ORDER)
        Queue a() {
            Map<String, Object> args = Maps.newHashMap();
            args.put("x-dead-letter-exchange", RabbitMqExchange.ExchangeCenter.DEAD_LETTER_EXCHANGE_CONSUME);
            args.put("x-dead-letter-routing-key", DEAD_LETTER_CONSUME_CLOSE_ORDER);
            args.put("x-message-ttl", 600 * 1000);
    
            return new Queue(DEAD_LETTER_PROD_CLOSE_ORDER, true, false, false, args);
        }
    

    但是服务器上的配置是

    x-message-ttl=1000
    

    则代码配置与现有队列配置不一致,抛出该错误

    3、解决

    方式一、修改项目配置与mq 保持一致

    修改项目配置与MQ一致则可以正常运行


    方式二、删除mq上现有队列
    删除mq 上现有队列,则springboot 自动向mq 服务器注册一个新的队列,也可以解决该问题

    展开全文
  • 在写这篇文章是,当前有几个channel可供选择,分别是Memory Channel, JDBC Channel , File Channel,Psuedo Transaction Channel。比较常见的是前三种channel。具体使用那种channel,需要根据具体的使用场景。这里我...
  • Java NIO Channel to Channel Transfers

    千次阅读 2014-11-20 08:50:22
    Java NIO Channel to Channel Transfers
  • WIFI channel

    千次阅读 2017-11-26 15:19:22
    目前主流的无线WIFI网络设备不管是802.11b/g还是802.11b/g/n 一般都支持13个信道。它们的中心频率虽然不同,但是...信道也称作通道(Channel)、频段,是以无线信号(电磁波)作为传输载体的数据信号传送通道。无线网络
  • 通道(Channel)与缓冲区(Buffer)

    万次阅读 2019-12-13 01:06:47
    Channel负责传输、Buffer负责存储 Java NIO系统的核心在于: 通道(Channel)和缓冲区(Buffer)。通道表示打开到 IO 设备(例如:文件、套接字)的连接。若需要使用NIO 系统,需要获取用于连接 IO 设备的通道以及用于...
  • Could not read input channel

    2015-05-05 09:52:35
    05-04 00:05:07.453 2865 2865 E AndroidRuntime: at android.view.InputChannel$1.createFromParcel(InputChannel.java:36) 05-04 00:05:07.453 2865 2865 E AndroidRuntime: at ...
  • Golang 深入源码 —— channel

    万次阅读 2020-07-21 15:58:12
    这句话是 Go 语言设计团队的首任负责人 Rob Pike 对并发编程的建议,也是 Go 的并发哲学,通道 Channel 便是基于这种哲学 我们可以把 Channel 看做是一个先进先出(FIFO)的数据队列 数据结构 type hchan struct { ...
  • Could not read input channel file descriptors from parcel. 1.RemoteView中添加的图片太大了,超过40K会报这个异常 2.Intent传递的数据太大了超过1M也会报这个错误 3.FileDescripter太多而且没有关闭,looper太多...
  • 客户端主动关闭连接,服务端关闭channel的过程 在服务端中,在AbstractChannel#AbstractUnsafe的close(final ChannelPromise promise)方法上设置断点:这个是比较顶层的方法。 往下看底层开始时的调用栈:在...
  • Golang channel

    万次阅读 2020-08-04 11:17:42
    channel 是 Go 语言中的一个核心类型,可以把它看成管道。并发核心单元通过它就可以发送或者接收数据进行通讯,这在一定程度上又进一步降低了编程的难度。 channel 是一个数据类型,主要用来解决 go 程的同步问题...
  • b.option(ChannelOption.SO_BACKLOG, 1024 * 1024 * 1024 * 1024); b.option(ChannelOption.SO_SNDBUF, 1024 * 1024 * 1024 * 1024); b.option(ChannelOption.MAX_MESSAGES_PER_READ, Integer.MAX_VALUE); b....
  • Netty:Channel

    千次阅读 2015-11-23 14:16:04
    介绍一下channel中如何通过sokect连接客户端和服务端
  • Java NIO Channel

    千次阅读 2014-11-20 08:48:03
    Java NIO Channel
  • 当执行的命令返回的结果数据量较大时,libssh2_channel_read()只能读取channel的一部分,不知道怎样才能读取全部的数据。 //下面是读取channel的代码 for( ;; ) { libssh2_channel_receive_window...

空空如也

1 2 3 4 5 ... 20
收藏数 60,389
精华内容 24,155
关键字:

channel