作者都是各自领域经过审查的专家,并撰写他们有经验的主题. 我们所有的内容都经过同行评审,并由同一领域的Toptal专家验证.
马哈茂德·里德万的头像

Mahmud Ridwan

Mahmud是一名软件开发人员,拥有多年的经验和效率诀窍, scalability, 稳定的解.

Years of Experience

13

Share

扩展web应用程序几乎总是一个有趣的挑战,不管它有多复杂. 然而,实时web应用程序带来了独特的可扩展性问题. For example, 能够横向扩展使用WebSockets与客户端通信的消息传递web应用程序, 它将需要以某种方式同步其所有服务器节点. 如果应用程序不是基于这一点构建的, 那么横向扩展它可能不是一个容易的选择.

In this article, 我们将介绍一个简单的实时图像共享和消息传递web应用程序的架构. 在这里,我们将重点关注各种组件,例如 Redis Pub/Sub, 参与构建实时应用程序,并了解它们如何在整体架构中发挥作用.

实时使用Redis Pub/Sub

实时使用Redis Pub/Sub

在功能方面,该应用程序非常轻量级. 它允许上传图像和对这些图像的实时评论. Furthermore, 任何用户都可以点击图像,其他用户将能够在他们的屏幕上看到涟漪效应.

这个应用程序的整个源代码是 可以在GitHub上找到.

Things We Need

Go

我们将使用Go编程语言. 我们在本文中选择Go并没有什么特别的原因, 除此之外,Go的语法是干净的,它的语义更容易理解. 当然还有作者的偏见. However, 本文中讨论的所有概念都可以很容易地翻译成您选择的语言.

开始使用Go很容易. 它的二进制分布可以是 从官方网站下载. 如果你是在Windows上,在他们的下载页面上有一个MSI安装程序. 或者,如果您的操作系统(幸运的是)提供了一个包管理器:

Arch Linux:

pacman -S go

Ubuntu:

Apt-get安装golang

Mac OS X:

brew install go

只有当我们有 Homebrew installed.

MongoDB

你问,如果我们有Redis,为什么要使用MongoDB? 如前所述,Redis是一个内存中的数据存储. 尽管它可以将数据持久化到磁盘, 使用Redis来实现这个目的可能不是最好的方法. 我们将使用MongoDB来存储上传的图像元数据和消息.

We can download MongoDB 从他们的官方网站. 在一些Linux发行版中,这是安装MongoDB的首选方式. 尽管如此,使用大多数发行版的包管理器应该仍然可以安装它.

Arch Linux:

pacman -S mongodb

Ubuntu:

Apt-get安装mongodb

Mac OS X:

Brew安装mongodb

在Go代码中,我们将使用这个包 mgo (pronounced mango). 它不仅经过了实战测试,而且驱动程序包提供了一个非常干净和简单的API.

If you are not a MongoDB expert,你完全不用担心. 在我们的示例应用程序中,这个数据库服务的使用很少, 并且几乎与本文的重点:Pub/Sub体系结构无关.

Amazon S3

我们将使用Amazon S3来存储用户上传的图像. 这里没什么可做的,除了确保我们有一个 Amazon Web Services 已准备好的帐户和已创建的临时存储桶.

将上传的文件存储到本地磁盘不是一个选择,因为我们不想以任何方式依赖于我们的web节点的身份. 我们希望用户能够连接到任何可用的web节点,仍然能够看到相同的内容.

要从Go代码中与Amazon S3 bucket交互,我们将使用 AdRoll/goamz, a fork of Canonical’s goamz 有一些差异的包装.

Redis

最后,但并非最不重要的:Redis. 我们可以使用发行版的包管理器来安装它:

Arch Linux:

pacman -S redis

Ubuntu:

安装redis-server

Mac OS X:

brew install redis

或者,获取其源代码和 自己编译. Redis除了GCC和libc之外没有其他依赖关系:

wget http://download.redis.io/redis-stable.tar.gz
Tar XVZF不稳定.tar.gz
cd redis-stable
make

安装并运行Redis后,启动终端并进入Redis命令行:

redis-cli

尝试输入以下命令,看看是否得到预期的输出:

SET answer 41
INCR answer
GET answer

第一个命令将“41”存储在键“answer”上。, 第二个命令增加该值, 第三个命令打印针对给定键存储的值. 结果应该是“42”.

You can learn more 关于Redis支持的所有命令.

我们将使用Go包 redigo 从我们的应用程序代码中连接到Redis.

看看Redis Pub/Sub吧

发布-订阅模式是将消息传递给任意数量的发送方的一种方式. 这些消息的发送者(发布者)没有明确标识目标收件人. Instead, 消息在一个通道上发送出去,任意数量的接收者(订阅者)都可以在该通道上等待消息.

简单发布-订阅配置

在我们的例子中,我们可以在负载均衡器后面运行任意数量的web节点. 在任何给定时刻,查看同一图像的两个用户可能没有连接到同一节点. 这就是Redis Pub/Sub发挥作用的地方. 每当web节点需要观察变化时(例如用户创建的新消息), 它将使用Redis Pub/Sub将这些信息广播到所有相关的web节点. Which, in turn, 是否将信息传播到相关的客户端,以便他们可以获取更新的messagesredis列表.

因为发布-订阅模式允许我们在指定通道上分派消息, 我们可以让每个web节点连接到Redis, 并且只订阅那些用户感兴趣的频道. For example, 如果两个用户都在看同一张图片,但是连接到许多网络节点中的两个不同的网络节点, 然后只有这两个web节点需要订阅相应的频道. 在该通道上发布的任何消息将仅传递给这两个web节点.

听起来好得令人难以置信? 我们可以使用Redis的CLI进行尝试. 启动三个实例 redis-cli. 在第一个实例中执行如下命令:

订阅somechannel

在第二个Redis CLI实例中执行如下命令:

订阅someotherchannel

在Redis CLI的第三个实例中执行以下命令:

发布一些渠道日志
发布其他通道ipsum

注意第一个实例是如何接收到“lorem”而不是“ipsum”的, 以及二审如何收到" ipsum "而不是" lorem ".

Redis Pub/Sub在行动

值得一提的是,一旦Redis客户端进入订阅者模式, 它不能再执行任何操作,只能订阅更多的通道或取消订阅已订阅的通道. 这意味着每个web节点将需要维护到Redis的两个连接, 一个是作为订阅者连接到Redis,另一个是在通道上发布消息,以便订阅这些通道的任何web节点都可以接收到消息.

实时性和可扩展性

在开始探索幕后发生的事情之前,让我们先克隆这个存储库:

mkdir tonesa
cd tonesa
出口GOPATH = ' pwd '
-p src/github.com/hjr265/tonesa
cd src/github.com/hjr265/tonesa
Git克隆http://github.com/hjr265/tonesa.git .
go get ./...

… and compile it:

go build ./cmd/tonesad

要运行应用程序,首先创建一个名为 .(最好通过复制文件Env -sample.txt):

cp env-sample.txt .env

Fill out the .包含所有必要环境变量的Env文件:

MONGO_URL = mongodb: / / 127.0.0.1/tonesa
REDIS_URL =复述:/ / 127.0.0.1
AWS_ACCESS_KEY_ID = {Your-AWS-Access-Key-ID-Goes-Here}
AWS_SECRET_ACCESS_KEY = {And-Your-AWS-Secret-Access-Key}
S3_BUCKET_NAME = {And-S3-Bucket-Name}

最后运行编译好的二进制文件:

PORT=9091 ./ tonesad -env-file =.env

web节点现在应该正在运行,并且可以通过http://localhost:9091访问.

Live example

来测试它在水平缩放时是否仍然有效, 你可以用不同的端口号启动多个web节点:

PORT=9092 ./ tonesad -env-file =.env
PORT=9093 ./ tonesad -env-file =.env

并通过相应的url: http://localhost:9092和http://localhost:9093访问它们.

Live example

Behind the Scenes

而不是经历应用开发的每一步, 我们将集中讨论一些最重要的部分. 尽管并非所有这些都与Redis发布/订阅及其实时含义100%相关, 它们仍然与应用程序的整体结构相关,一旦我们深入研究,就会更容易跟上.

为了简单起见,我们不考虑用户身份验证. 上传将是匿名的,每个知道URL的人都可以使用. 所有查看者都可以发送消息,并且可以选择自己的别名. 调整适当的身份验证机制和隐私功能应该是微不足道的, 并且超出了本文的范围.

Persisting Data

This one is easy.

每当用户上传图像时, 我们将其存储在Amazon S3中,然后根据两个ID将其路径存储在MongoDB中:一个BSON对象ID (MongoDB的最爱), 另一个短的8个字符长的ID(有点赏心悦目). 它进入数据库的“uploads”集合,结构如下:

上传struct {
	ID      bson.ObjectId bson:“_id”
	短tid字符串' bson: '短tid ' '

	“善良”:“善良”。

	Content Blob ' bson:" Content "

	CreatedAt  time.时间的bson:“createdAt”
	ModifiedAt time.时间的bson:“modifiedAt”
}
type Blob struct {
	路径字符串' bson: ' Path ' '
	Size int64 ' bson: ' Size ' '
}

The field Kind 表示“上传”所包含的媒体类型. 这是否意味着我们支持图像以外的媒体? Unfortunately no. 但这个领域被留在那里是为了提醒我们,我们在这里并不一定局限于图像.

当用户相互发送消息时,它们被存储在不同的集合中. 是的,你已经猜到了:“消息”。.

类型消息结构{
	ID bson.ObjectId bson:“_id”

	UploadID bson.ObjectId bson:“uploadID”

	AuthorName字符串' bson: '匿名' '
	内容字符串' bson:" Content " '

	CreatedAt  time.时间的bson:“createdAt”
	ModifiedAt time.时间的bson:“modifiedAt”
}

这里唯一有趣的地方是UploadID字段, 哪个用于将消息关联到特定的上传.

API Endpoints

这个应用程序本质上有三个端点.

POST /api/uploads

这个端点的处理程序需要“multipart/form-data”提交,并在“file”字段中包含图像. 处理程序的行为大致如下:

函数HandleUploadCreate(.ResponseWriter, r *http.Request) {
	f, h, _ := r.FormFile("file")

	b := bytes.Buffer{}
	n, _ := io.Copy(&b, io.LimitReader(f, data.MaxUploadContentSize + 10))
	if n > data.MaxUploadContentSize {
		ServeBadRequest (w, r)
		return
	}

	id := bson.NewObjectId()
	upl := data.Upload{
		ID:   id,
		Kind: data.Image,
		Content: data.Blob{
			路径:“/uploads/”+ id.Hex(),
			Size: n,
		},
	}

	data.Bucket.Put(upl.Content.Path, b.Bytes(), h.Header.(“内容类型”),s3.Private, s3.Options{})

	upl.Put()

	//返回新创建的上传实体(JSON编码)
}

Go要求显式处理所有错误. 这已经在原型机中完成了, 但是在本文的代码片段中省略了它,以便将重点放在关键部分上.

在此API端点的处理程序中, 我们实际上是在读取文件,但将其大小限制为特定值. 如果上传超过这个值,请求将被拒绝. Otherwise, 生成一个BSON ID,用于在将上传实体持久化到MongoDB之前将图像上传到Amazon S3.

生成BSON对象id的方式有利有弊. 它们在客户端生成. 然而,用于生成对象ID的策略使得碰撞的概率非常小,因此在客户端生成它们是安全的. On the other hand, 生成的Object id的值通常是顺序的,Amazon S3就是这样 not quite fond of. 一个简单的解决方法是在文件名前加上一个随机字符串.

GET / api /上传/ {id} /消息

这个API用于获取最近的消息, 以及在特定时间之后发布的消息.

函数ServeMessageList.ResponseWriter, r *http.Request) {
	vars := mux.Vars(r)
	idStr:= vars["id"]
	if !bson.IsObjectIdHex (idStr) {
		ServeNotFound (w, r)
		return
	}
	upl, _ := data.GetUpload(bson.ObjectIdHex (idStr))
	if upl == nil {
		ServeNotFound (w, r)
		return
	}

	sinceStr := r.URL.Query().Get("since")

	var msgs []data.Message
	if sinceStr != "" {
		since, _ := time.Parse(time.RFC3339, sinceStr)

		msgs, _ = data.ListMessagesByUploadID推.ID, since, 16)

	} else {
		msgs, _ = data.ListRecentMessagesByUploadID推.ID, 16)
	}

	//响应消息实体(JSON编码)
}

当用户的浏览器收到关于用户当前正在查看的上传的新消息的通知时, 它使用此端点获取新消息.

POST / api /上传/ {id} /消息

最后是创建消息并通知所有人的处理程序:

函数handlemessagecrecreate (.ResponseWriter, r *http.Request) {
	vars := mux.Vars(r)
	idStr:= vars["id"]
	if !bson.IsObjectIdHex (idStr) {
		ServeNotFound (w, r)
		return
	}
	upl, _ := data.GetUpload(bson.ObjectIdHex (idStr))
	if upl == nil {
		ServeNotFound (w, r)
		return
	}

	body := Message{}
	json.NewDecoder(r.Body).Decode(&body)

	msg := data.Message{}
	msg.UploadID = upl.ID
	msg.AuthorName = body.AuthorName
	msg.Content = body.Content
	msg.Put()

	//响应新创建的消息实体(JSON编码)

	hub.Emit("upload:"+upl.ID.十六进制(),“信息:“+味精.ID.Hex())
}

这个处理程序与其他处理程序非常相似,甚至在这里包含它几乎是无聊的. Or is it? 注意这里有一个函数调用 hub.Emit() 在函数的最后. What is hub you say? 这就是Pub/Sub的神奇之处.

Hub: WebSockets与Redis相遇的地方

Hub是我们将WebSockets与Redis的Pub/Sub通道粘合在一起的地方. And, 巧合的是,我们在web服务器中用来处理WebSockets的包被调用了 glue.

Hub本质上维护了一些数据结构,这些数据结构在所有连接的WebSockets到它们感兴趣的所有通道之间创建了一个映射. For example, 用户浏览器选项卡上指向特定上传图像的WebSocket自然应该对所有与之相关的通知感兴趣.

hub包实现了六个功能:

  • Subscribe
  • UnsubscribeAll
  • Emit
  • EmitLocal
  • InitHub
  • HandleSocket

订阅和取消订阅

函数订阅(s *glue).套接字,t字符串)错误{
	l.Lock()
	defer l.Unlock()

	_, ok := sockets[s]
	if !ok {
		Sockets [s] = map[string]bool{}
	}
	Sockets [s][t] = true

	_, ok = topics[t]
	if !ok {
		topic [t] = map[*glue ..Socket]bool{}
		err := subconn.Subscribe(t)
		if err != nil {
			return err
		}
	}
	topics[t][s] = true

	return nil
}

This function, 就像这个包里的其他大多数一样, 在一个读/写互斥锁执行时对其持有锁. 这样我们就可以安全地修改原始数据结构变量 sockets and topics. 第一个变量, sockets,将套接字映射到通道名称,而第二个, topics,将通道名称映射到套接字. 在这个函数中,我们构建这些映射. 每当我们看到套接字订阅一个新的通道名称时,我们就建立Redis连接, subconn,在Redis上订阅该频道 subconn.Subscribe. 这使得Redis将该通道上的所有通知转发到该web节点.

同样地,在 UnsubscribeAll 函数,我们将映射拆下:

取消订阅(s *glue).Socket) error {
	l.Lock()
	defer l.Unlock()

	For t:= range sockets[s] {
		删除(主题[t],年代)
		如果len(topics[t]) == 0 {
			delete(topics, t)
			err := subconn.Unsubscribe(t)
			if err != nil {
				return err
			}
		}
	}
	delete(sockets, s)

	return nil
}

当我们从对特定通道感兴趣的数据结构中删除最后一个套接字时, 我们取消订阅Redis频道 subconn.Unsubscribe.

Emit

输出(字符串)错误{
	_, err := pubconn.Do("PUBLISH", t, m)
	return err
}

这个函数发布消息 m on channel t 使用发布连接到Redis.

EmitLocal

函数EmitLocal(t字符串,m字符串){
	l.RLock()
	defer l.RUnlock()

	对于s:= range topics[t] {
		s.Write(m)
	}
}

InitHub

函数InitHub(url字符串)错误{
	c, _ := redis.DialURL(url)
	pubconn = c

	c, _ = redis.DialURL(url)
	subconn = redis.PubSubConn{c}

	go func() {
		for {
			Switch v:= subconn.Receive().(type) {
			case redis.Message:
				EmitLocal(v.Channel, string(v.Data))

			case error:
				panic(v)
			}
		}
	}()

	return nil
}

In InitHub function, 我们正在创建两个连接到Redis:一个用于订阅这个web节点感兴趣的频道, 另一个用来发布消息. 一旦建立了联系, 我们开始一个新的Go例程,循环永远运行,等待通过订阅者连接到Redis接收消息. 每次接收到消息时,它都会在本地发出消息.e. 连接到此web节点的所有WebSockets).

HandleSocket

And finally, HandleSocket 是我们等待消息通过WebSockets或在连接关闭后清理的地方:

函数handlessocket (s *glue.Socket) {
	s.OnClose(func() {
		UnsubscribeAll(s)
	})

	s.OnRead(函数c(数据字符串){
		fields := strings.Fields(data)
		如果len(fields) == 0 {
			return
		}
		switch fields[0] {
		case "watch":
			if len(fields) != 2 {
				return
			}
			订阅(年代,领域[1])

		case "touch":
			if len(fields) != 4 {
				return
			}
			发出(领域[1],[2]“碰:“+字段+”、“+字段[3])
		}
	})
}

前端JavaScript

因为glue自带自己的前端JavaScript库, 处理WebSockets要容易得多(或者在WebSockets不可用时回退到XHR轮询):

var socket = glue()
socket.onMessage(功能(数据){
	data = data.split(':')
	switch(data[0]) {
		case 'message':
			messages.fetch({
				data: {
					since: _.first(messages.拔('createdAt')) || "
				},
				add: true,
				remove: false
			})
			break

		case 'touch':
			Var cods = data[1].split(',')
			showTouchBubble(坐标)
			break
	}
})
socket.发送(“观察上传:”+上传.id)

在客户端,我们监听通过WebSocket传入的任何消息. 因为glue以字符串的形式传输所有的消息, 我们使用特定的模式对其中的所有信息进行编码:

  • 新消息:" message:{messageID} "
  • 点击图片:" touch:{coordX} ",{coordY}”, 其中coordX和coordY是用户在图像上点击位置的百分比坐标

当用户创建新消息时, 我们使用" POST /api/uploads/{uploadID}/messages " api来创建一条新消息. 这是使用 create 方法在消息的骨干集合上:

messages.create({
	authorName: $ messageAuthorNameEl.val(),
	内容:messageContentEl美元.val(),
	createdAt: ''
}, {
	at: 0
})

当用户单击图像时, 我们计算点击的位置占图像宽度和高度的百分比,并直接通过WebSocket发送信息.

socket.发送(“联系上传:”+上传.id+' '+(event.pageX - offset.左)/ $contentImgEl.width()+' '+(event.pageY - offset.top) / $contentImgEl.height())

The Overview

应用程序体系结构概述

当用户输入消息并按回车键时, 客户端调用“POST / api /上传/ {id} /消息”api端点. 这反过来在数据库中创建一个消息实体,并通过hub包通过Redis Pub/Sub在通道“upload:{uploadID}”上发布字符串“message:{messageID}”.

Redis将此字符串转发给对频道“upload:{uploadID}”感兴趣的每个web节点(订阅者)。. 接收该字符串的Web节点遍历与通道相关的所有WebSocket,并通过它们的WebSocket连接将字符串发送给客户端. 接收到此字符串的客户端开始使用" GET / api /上传/ {id} /消息 "从服务器获取新消息。.

Similarly, 用于传播图像上的单击事件, 客户端直接通过WebSocket发送一个类似于“touch upload:{uploadID} {coordX} {coordY}”的消息。. 此消息在hub包中结束,并在相同的通道“upload:{uploadID}”上发布。. 结果,字符串被分发给所有查看上传图像的用户. The client, 接收到该字符串后,对其进行解析以提取坐标,并呈现一个逐渐变暗的圆圈,以暂时突出显示单击位置.

Wrap Up

在本文中,我们看到了发布-订阅模式是如何帮助解决实时web应用扩展的问题的,并且相对容易.

这个示例应用程序的存在是为了作为一个试用Redis Pub/Sub的游乐场. But, 如前所述, 这些思想几乎可以在任何其他流行的编程语言中实现.

就这一主题咨询作者或专家.
Schedule a call
马哈茂德·里德万的头像
Mahmud Ridwan

Located in 达卡,达卡区,孟加拉国

Member since January 16, 2014

About the author

Mahmud是一名软件开发人员,拥有多年的经验和效率诀窍, scalability, 稳定的解.

Toptal作者都是各自领域经过审查的专家,并撰写他们有经验的主题. 我们所有的内容都经过同行评审,并由同一领域的Toptal专家验证.

Years of Experience

13

世界级的文章,每周发一次.

订阅意味着同意我们的 privacy policy

世界级的文章,每周发一次.

订阅意味着同意我们的 privacy policy

Toptal Developers

Join the Toptal® community.