ES同步写入和异步写入的两个问题

来源:7-16 索引商品数据 (二)

404_

2023-09-06

问题一:如下代码中 bulkService.Add(bulkCreateRequest)这一行,其实是向一个切片中追加bulkCreateRequest,切片并不是线程安全的,因此在调用bulkService.Add的时候,会不会有线程安全问题,需不需要加互斥锁?

// 批量的方式新建一批文档,实时提交
func (c *Client) BulkCreateDocs(ctx context.Context, indexName string, docs []*BulkCreateDoc) (*elastic.BulkResponse, error) {
	bulkService := c.Client.Bulk().ErrorTrace(true)
	for _, doc := range docs {
		bulkCreateRequest := elastic.NewBulkCreateRequest().Index(indexName)
		if len(doc.ID) > 0 {
			bulkCreateRequest.Id(doc.ID)
		}
		if len(doc.Routing) > 0 {
			bulkCreateRequest.Routing(doc.Routing)
		}

		bulkService.Add(bulkCreateRequest)
	}
	return bulkService.Do(ctx)
}

// Add adds bulkable requests, i.e. BulkIndexRequest, BulkUpdateRequest,
// and/or BulkDeleteRequest.
func (s *BulkService) Add(requests ...BulkableRequest) *BulkService {
	s.requests = append(s.requests, requests...)
	return s
}

问题二:在批量创建文档,后台提交模式中。我们只需要构建bulkCreateRequest,然后调用Add方法。将这个request添加到一个channel中,但是我发现这chan是一个同步的channel,没有缓存区的。我本来以为是异步提交的时候,是将request提交到一个有缓存的channel中,从这个有缓存的channel中批量的拉取数据,写入ES。但是现在看不是这个样子的。
那么我的问题是:异步写入的时候,是如何提交给ES的呢?

// Add adds a single request to commit by the BulkProcessorService.
//
// The caller is responsible for setting the index and type on the request.
func (p *BulkProcessor) Add(request BulkableRequest) {
	p.requestsC <- request
}
写回答

1回答

少林码僧

2023-09-07

问题一:
 bulkService.Add(bulkCreateRequest)这里并不会产生并发冲突,课程中讲到批量提交有两种方式,一种是使用BulkCreate一条条的提交,函数内部调用的是c.BulkProcessor.Add,这种情况下的确会存在并发调用,但由于c.BulkProcessor.Add是往channel里面写数据,所以这块是线程安全的。而 bulkService.Add(bulkCreateRequest)这里是在方法BulkCreateDocs内使用的,在这个方法的开头是声明了bulkService := c.Client.Bulk().ErrorTrace(true),所以这里的批量提交是根据传进来的文档集直接进行提交的,每次调用都是一个新的bulkService,所以不会有并发问题。


问题二:

这里根据requestsC 进行源码定位

https://img.mukewang.com/szimg/64f9935409d899a711080594.jpg

可以看到它读取的位置

https://img.mukewang.com/szimg/64f993c10902343010741045.jpg














这里会通过for select机制读取channel,最终调用的还是service.Add

(*) (requests ...) *{
    s.requests = (s.requestsrequests...)
    }

service.Add将新增的请求放到一个切片中,通过这个切片的读取位置,我们可以定位到它提交请求的逻辑

https://img.mukewang.com/szimg/64f9967809f1fa8211120703.jpg

最终是在commit中进行提交的

https://img.mukewang.com/szimg/64f99742091c65ec10210810.jpg

0
0

海量数据高并发场景,构建Go+ES8企业级搜索微服务

全新 ES8 配合技术组件,实现高性能搜索

267 学习 · 54 问题

查看课程