ES同步写入和异步写入的两个问题
来源:7-16 索引商品数据 (二)

404_
2023-09-06
问题一:如下代码中 bulkService.Add(bulkCreateRequest)这一行,其实是向一个切片中追加bulkCreateRequest,切片并不是线程安全的,因此在调用bulkService.Add的时候,会不会有线程安全问题,需不需要加互斥锁?
// 批量的方式新建一批文档,实时提交
func (c *Client) BulkCreateDocs(ctx context.Context, indexName string, docs []*BulkCreateDoc) (*elastic.BulkResponse, error) {
bulkService := c.Client.Bulk().ErrorTrace(true)
for _, doc := range docs {
bulkCreateRequest := elastic.NewBulkCreateRequest().Index(indexName)
if len(doc.ID) > 0 {
bulkCreateRequest.Id(doc.ID)
}
if len(doc.Routing) > 0 {
bulkCreateRequest.Routing(doc.Routing)
}
bulkService.Add(bulkCreateRequest)
}
return bulkService.Do(ctx)
}
// Add adds bulkable requests, i.e. BulkIndexRequest, BulkUpdateRequest,
// and/or BulkDeleteRequest.
func (s *BulkService) Add(requests ...BulkableRequest) *BulkService {
s.requests = append(s.requests, requests...)
return s
}
问题二:在批量创建文档,后台提交模式中。我们只需要构建bulkCreateRequest,然后调用Add方法。将这个request添加到一个channel中,但是我发现这chan是一个同步的channel,没有缓存区的。我本来以为是异步提交的时候,是将request提交到一个有缓存的channel中,从这个有缓存的channel中批量的拉取数据,写入ES。但是现在看不是这个样子的。
那么我的问题是:异步写入的时候,是如何提交给ES的呢?
// Add adds a single request to commit by the BulkProcessorService.
//
// The caller is responsible for setting the index and type on the request.
func (p *BulkProcessor) Add(request BulkableRequest) {
p.requestsC <- request
}
1回答
-
少林码僧
2023-09-07
问题一:
bulkService.Add(bulkCreateRequest)这里并不会产生并发冲突,课程中讲到批量提交有两种方式,一种是使用BulkCreate一条条的提交,函数内部调用的是c.BulkProcessor.Add,这种情况下的确会存在并发调用,但由于c.BulkProcessor.Add是往channel里面写数据,所以这块是线程安全的。而 bulkService.Add(bulkCreateRequest)这里是在方法BulkCreateDocs内使用的,在这个方法的开头是声明了bulkService := c.Client.Bulk().ErrorTrace(true),所以这里的批量提交是根据传进来的文档集直接进行提交的,每次调用都是一个新的bulkService,所以不会有并发问题。
问题二:这里根据requestsC 进行源码定位
可以看到它读取的位置
这里会通过for select机制读取channel,最终调用的还是service.Add
(*) (requests ...) *{ s.requests = (s.requestsrequests...) }
service.Add将新增的请求放到一个切片中,通过这个切片的读取位置,我们可以定位到它提交请求的逻辑
最终是在commit中进行提交的
00
相似问题
回答 1
回答 1