# -*- coding: utf-8 -*-
from __future__ import absolute_import, division, print_function, unicode_literals
import sys
import gzip
import json
import logging
import uuid
import time
import datetime
from io import BytesIO
from functools import partial
import requests
from . import __VERSION__
from .exceptions import HTTPError, TaskNotFoundError, TaskError, TimeoutError
PY2 = sys.version_info[0] == 2
DEFAULT_BOSONNLP_URL = 'https://api.bosonnlp.com'
DEFAULT_TIMEOUT = 30 * 60
if PY2:
text_type = unicode
string_types = (str, unicode)
else:
text_type = str
string_types = (str,)
logger = logging.getLogger(__name__)
def _generate_id():
return str(uuid.uuid4())
def _gzip_compress(buf):
zbuf = BytesIO()
with gzip.GzipFile(mode='wb', fileobj=zbuf, compresslevel=9) as zfile:
zfile.write(buf)
return zbuf.getvalue()
_json_dumps = partial(json.dumps, ensure_ascii=False, sort_keys=True)
[docs]class BosonNLP(object):
"""BosonNLP HTTP API 访问的封装类。
:param string token: 用于 API 鉴权的 API Token。
:param string bosonnlp_url: BosonNLP HTTP API 的 URL,默认为 `https://api.bosonnlp.com`。
:param bool compress: 是否压缩大于 10K 的请求体,默认为 True。
:param int timeout: HTTP 请求超时时间,默认为 60 秒。
"""
def __init__(self, token, bosonnlp_url=DEFAULT_BOSONNLP_URL, compress=True, session=None, timeout=60):
self.token = token
self.bosonnlp_url = bosonnlp_url.rstrip('/')
self.compress = compress
self.timeout = timeout
# Enable keep-alive and connection-pooling.
self.session = session or requests.session()
self.session.headers['X-Token'] = token
self.session.headers['Accept'] = 'application/json'
self.session.headers['User-Agent'] = 'bosonnlp.py/{} {}'.format(
__VERSION__, requests.utils.default_user_agent()
)
def _api_request(self, method, path, **kwargs):
kwargs.setdefault('timeout', self.timeout)
url = self.bosonnlp_url + path
if method == 'POST':
if 'data' in kwargs:
headers = kwargs.get('headers', {})
headers['Content-Type'] = 'application/json'
data = _json_dumps(kwargs['data'])
if isinstance(data, text_type):
data = data.encode('utf-8')
if len(data) > 10 * 1024 and self.compress: # 10K
headers['Content-Encoding'] = 'gzip'
data = _gzip_compress(data)
kwargs['data'] = data
kwargs['headers'] = headers
r = self.session.request(method, url, **kwargs)
http_error_msg = ''
if 400 <= r.status_code < 600:
reason = r.reason
try:
reason = r.json()['message']
except:
pass
http_error_msg = 'HTTPError: %s %s' % (r.status_code, reason)
if http_error_msg:
raise HTTPError(http_error_msg, response=r)
return r
[docs] def sentiment(self, contents, model='general'):
"""BosonNLP `情感分析接口 <http://docs.bosonnlp.com/sentiment.html>`_ 封装。
:param contents: 需要做情感分析的文本或者文本序列。
:type contents: string or sequence of string
:param model: 使用不同语料训练的模型,默认使用通用模型。
:type model: string
:returns: 接口返回的结果列表。
:raises: :py:exc:`~bosonnlp.HTTPError` 如果 API 请求发生错误。
调用示例:
>>> import os
>>> nlp = BosonNLP(os.environ['BOSON_API_TOKEN'])
>>> nlp.sentiment('这家味道还不错', model='food')
[[0.9991737012037423, 0.0008262987962577828]]
>>> nlp.sentiment(['这家味道还不错', '菜品太少了而且还不新鲜'], model='food')
[[0.9991737012037423, 0.0008262987962577828],
[9.940036427291687e-08, 0.9999999005996357]]
"""
api_endpoint = '/sentiment/analysis?' + model
r = self._api_request('POST', api_endpoint, data=contents)
return r.json()
[docs] def convert_time(self, content, basetime=None):
"""BosonNLP `时间描述转换接口 <http://docs.bosonnlp.com/time.html>`_ 封装
:param content: 中文时间描述字符串
:type content: string
:param basetime: 时间描述的基准时间,传入一个时间戳或datetime
:type basetime: int or datetime.datetime
:raises: :py:exc:`~bosonnlp.HTTPError` 如果 API 请求发生错误。
:returns: 接口返回的结果
调用示例:
>>> import os
>>> nlp = BosonNLP(os.environ['BOSON_API_TOKEN'])
>>> _json_dumps(nlp.convert_time("2013年二月二十八日下午四点三十分二十九秒"))
'{"timestamp": "2013-02-28 16:30:29", "type": "timestamp"}'
>>> import datetime
>>> _json_dumps(nlp.convert_time("今天晚上8点到明天下午3点", datetime.datetime(2015, 9, 1)))
'{"timespan": ["2015-09-02 20:00:00", "2015-09-03 15:00:00"], "type": "timespan_0"}'
"""
api_endpoint = '/time/analysis'
params = {'pattern': content}
if basetime:
if isinstance(basetime, datetime.datetime):
basetime = int(time.mktime(basetime.timetuple()))
params['basetime'] = basetime
r = self._api_request('POST', api_endpoint, params=params)
return r.json()
[docs] def classify(self, contents):
"""BosonNLP `新闻分类接口 <http://docs.bosonnlp.com/classify.html>`_ 封装。
:param contents: 需要做分类的新闻文本或者文本序列。
:type contents: string or sequence of string
:returns: 接口返回的结果列表。
:raises: :py:exc:`~bosonnlp.HTTPError` 如果 API 请求发生错误。
调用示例:
>>> import os
>>> nlp = BosonNLP(os.environ['BOSON_API_TOKEN'])
>>> nlp.classify('俄否决安理会谴责叙军战机空袭阿勒颇平民')
[5]
>>> nlp.classify(['俄否决安理会谴责叙军战机空袭阿勒颇平民',
... '邓紫棋谈男友林宥嘉:我觉得我比他唱得好',
... 'Facebook收购印度初创公司'])
[5, 4, 8]
"""
api_endpoint = '/classify/analysis'
r = self._api_request('POST', api_endpoint, data=contents)
return r.json()
[docs] def suggest(self, word, top_k=None):
"""BosonNLP `语义联想接口 <http://docs.bosonnlp.com/suggest.html>`_ 封装。
:param string word: 需要做语义联想的词。
:param int top_k: 默认为 10,最大值可设定为 100。返回的结果条数。
:returns: 接口返回的结果列表。
:raises: :py:exc:`~bosonnlp.HTTPError` 如果 API 请求发生错误。
调用示例:
>>> import os
>>> nlp = BosonNLP(os.environ['BOSON_API_TOKEN'])
>>> nlp.suggest('北京', top_k=2)
[[1.0, '北京/ns'], [0.7493540460397998, '上海/ns']]
"""
api_endpoint = '/suggest/analysis'
params = {}
if top_k is not None:
params['top_k'] = top_k
r = self._api_request('POST', api_endpoint, params=params, data=word)
return r.json()
[docs] def depparser(self, contents):
"""BosonNLP `依存文法分析接口 <http://docs.bosonnlp.com/depparser.html>`_ 封装。
:param contents: 需要做依存文法分析的文本或者文本序列。
:type contents: string or sequence of string
:returns: 接口返回的结果列表。
:raises: :py:exc:`~bosonnlp.HTTPError` 如果 API 请求发生错误。
调用示例:
>>> import os
>>> nlp = BosonNLP(os.environ['BOSON_API_TOKEN'])
>>> nlp.depparser('今天天气好')
[{'head': [2, 2, -1],
'role': ['TMP', 'SBJ', 'ROOT'],
'tag': ['NT', 'NN', 'VA'],
'word': ['今天', '天气', '好']}]
>>> nlp.depparser(['今天天气好', '美好的世界'])
[{'head': [2, 2, -1],
'role': ['TMP', 'SBJ', 'ROOT'],
'tag': ['NT', 'NN', 'VA'],
'word': ['今天', '天气', '好']},
{'head': [1, 2, -1],
'role': ['DEC', 'NMOD', 'ROOT'],
'tag': ['VA', 'DEC', 'NN'],
'word': ['美好', '的', '世界']}]
"""
api_endpoint = '/depparser/analysis'
r = self._api_request('POST', api_endpoint, data=contents)
return r.json()
[docs] def ner(self, contents, sensitivity=None, segmented=False, space_mode='3'):
"""BosonNLP `命名实体识别接口 <http://docs.bosonnlp.com/ner.html>`_ 封装。
:param contents: 需要做命名实体识别的文本或者文本序列。
:type contents: string or sequence of string
:param sensitivity: 准确率与召回率之间的平衡,
设置成 1 能找到更多的实体,设置成 5 能以更高的精度寻找实体。
:type sensitivity: int 默认为 3
:param segmented: 输入是否为分词结果
:type segmented: boolean 默认为 False
:param space_mode: 分词空格保留选项
:type space_mode: int(整型), 0-3有效,默认为 3
:returns: 接口返回的结果列表。
:raises: :py:exc:`~bosonnlp.HTTPError` 如果 API 请求发生错误。
调用示例:
>>> import os
>>> nlp = BosonNLP(os.environ['BOSON_API_TOKEN'])
>>> nlp.ner('成都商报记者 姚永忠', sensitivity=2)
[{'entity': [[0, 2, 'product_name'],
[2, 3, 'job_title'],
[3, 4, 'person_name']],
'tag': ['ns', 'n', 'n', 'nr'],
'word': ['成都', '商报', '记者', '姚永忠']}]
>>> nlp.ner(['成都商报记者 姚永忠', '微软XP操作系统今日正式退休'])
[{'entity': [[0, 2, 'product_name'],
[2, 3, 'job_title'],
[3, 4, 'person_name']],
'tag': ['ns', 'n', 'n', 'nr'],
'word': ['成都', '商报', '记者', '姚永忠']},
{'entity': [[0, 2, 'product_name'],
[3, 4, 'time']],
'tag': ['nz', 'nx', 'nl', 't', 'ad', 'v'],
'word': ['微软', 'XP', '操作系统', '今日', '正式', '退休']}]
"""
api_endpoint = '/ner/analysis'
params = {'space_mode': space_mode}
if sensitivity is not None:
params['sensitivity'] = sensitivity
if segmented:
params['segmented'] = True
r = self._api_request('POST', api_endpoint, data=contents, params=params)
return r.json()
[docs] def tag(self, contents, space_mode=0, oov_level=3, t2s=0, special_char_conv=0):
"""BosonNLP `分词与词性标注 <http://docs.bosonnlp.com/tag.html>`_ 封装。
:param contents: 需要做分词与词性标注的文本或者文本序列。
:type contents: string or sequence of string
:param space_mode: 空格保留选项
:type space_mode: int(整型), 0-3有效
:param oov_level: 枚举强度选项
:type oov_level: int(整型), 0-4有效
:param t2s: 繁简转换选项,繁转简或不转换
:type t2s: int(整型), 0-1有效
:param special_char_conv: 特殊字符转化选项,针对回车、Tab等特殊字符转化或者不转化
:type special_char_conv: int(整型), 0-1有效
:returns: 接口返回的结果列表。
:raises: :py:exc:`~bosonnlp.HTTPError` 如果 API 请求发生错误。
调用参数及返回值详细说明见:http://docs.bosonnlp.com/tag.html
调用示例:
>>> import os
>>> nlp = BosonNLP(os.environ['BOSON_API_TOKEN'])
>>> result = nlp.tag('成都商报记者 姚永忠')
>>> _json_dumps(result)
'[{"tag": ["ns", "n", "n", "nr"], "word": ["成都", "商报", "记者", "姚永忠"]}]'
>>> format_tag_result = lambda tagged: ' '.join('%s/%s' % x for x in zip(tagged['word'], tagged['tag']))
>>> result = nlp.tag("成都商报记者 姚永忠")
>>> format_tag_result(result[0])
'成都/ns 商报/n 记者/n 姚永忠/nr'
>>> result = nlp.tag("成都商报记者 姚永忠", space_mode=2)
>>> format_tag_result(result[0])
'成都/ns 商报/n 记者/n /w 姚永忠/nr'
>>> result = nlp.tag(['亚投行意向创始成员国确定为57个', '“流量贵”频被吐槽'], oov_level=0)
>>> format_tag_result(result[0])
'亚/ns 投/v 行/n 意向/n 创始/vi 成员国/n 确定/v 为/v 57/m 个/q'
>>> format_tag_result(result[1])
'“/wyz 流量/n 贵/a ”/wyy 频/d 被/pbei 吐槽/v'
"""
api_endpoint = '/tag/analysis'
params = {
'space_mode': space_mode,
'oov_level': oov_level,
't2s': t2s,
'special_char_conv': special_char_conv,
}
r = self._api_request('POST', api_endpoint, params=params, data=contents)
return r.json()
[docs] def summary(self, title, content, word_limit=0.3, not_exceed=False):
"""BosonNLP `新闻摘要 <http://docs.bosonnlp.com/summary.html>`_ 封装。
:param title: 需要做摘要的新闻标题。如果没有标题,请传空字符串。
:type title: unicode
:param content: 需要做摘要的新闻正文。
:type content: unicode
:param word_limit: 摘要字数限制。
当为 float 时,表示字数为原本的百分比,0.0-1.0有效;
当为 int 时,表示摘要字数。
.. note::
传 1 默认为百分比。
:type word_limit: float or int
:param not_exceed: 是否严格限制字数。
:type not_exceed: bool,默认为 False
:returns: 摘要。
:raises: :py:exc:`~bosonnlp.HTTPError` 当API请求发生错误。
调用参数及返回值详细说明见: http://docs.bosonnlp.com/summary.html
调用示例:
>>> import os
>>> nlp = BosonNLP(os.environ['BOSON_API_TOKEN'])
>>> content = (
'腾讯科技讯(刘亚澜)10月22日消息,前优酷土豆技术副总裁'
'黄冬已于日前正式加盟芒果TV,出任CTO一职。'
'资料显示,黄冬历任土豆网技术副总裁、优酷土豆集团产品'
'技术副总裁等职务,曾主持设计、运营过优酷土豆多个'
'大型高容量产品和系统。'
'此番加入芒果TV或与芒果TV计划自主研发智能硬件OS有关。')
>>> title = '前优酷土豆技术副总裁黄冬加盟芒果TV任CTO'
>>> nlp.summary(title, content, 0.1)
腾讯科技讯(刘亚澜)10月22日消息,前优酷土豆技术副总裁黄冬已于日前正式加盟芒果TV,出任CTO一职。
"""
api_endpoint = '/summary/analysis'
not_exceed = int(not_exceed)
data = {
'not_exceed': not_exceed,
'percentage': word_limit,
'title': title,
'content': content
}
r = self._api_request('POST', api_endpoint, data=data)
return r.json()
def _cluster_push(self, task_id, contents):
api_endpoint = '/cluster/push/' + task_id
contents = ClusterTask._prepare_contents(contents)
if not contents:
return False
for i in range(0, len(contents), 100):
chunk = contents[i:i + 100]
r = self._api_request('POST', api_endpoint, data=chunk)
logger.info('Pushed %d of %d documents for clustering.' % (i+len(chunk), len(contents)))
return r.ok
def _cluster_analysis(self, task_id, alpha=None, beta=None):
api_endpoint = '/cluster/analysis/' + task_id
params = {}
if alpha is not None:
params['alpha'] = alpha
if beta is not None:
params['beta'] = beta
r = self._api_request('GET', api_endpoint, params=params)
logger.info('Cluster analysis started.')
return r.ok
def _cluster_status(self, task_id):
api_endpoint = '/cluster/status/' + task_id
r = self._api_request('GET', api_endpoint)
status = str(r.json()['status']).lower()
if status == 'not found':
raise TaskNotFoundError('cluster {} not found'.format(task_id), response=r)
if status == 'error':
raise TaskError('cluster {} error'.format(task_id), response=r)
logger.info('Status: %s.' % status)
return status
def _cluster_result(self, task_id):
api_endpoint = '/cluster/result/' + task_id
v = self._api_request('GET', api_endpoint).json()
logger.info('%d comments fetched.' % len(v))
return v
def _cluster_clear(self, task_id):
api_endpoint = '/cluster/clear/' + task_id
r = self._api_request('GET', api_endpoint)
return r.ok
[docs] def cluster(self, contents, task_id=None, alpha=None, beta=None, timeout=DEFAULT_TIMEOUT):
"""BosonNLP `文本聚类接口 <http://docs.bosonnlp.com/cluster.html>`_ 封装。
:param contents: 需要做文本聚类的文本序列或者 (_id, text) 序列或者
{'_id': _id, 'text': text} 序列。如果没有指定 _id,则自动
生成唯一的 _id。
:type contents: sequence of string or sequence of (_id, text) or
sequence of {'_id': _id, 'text': text}
:param string task_id:
唯一的 task_id,话题聚类任务的名字,可由字母和数字组成。
:param float alpha: 默认为 0.8,聚类最大 cluster 大小
:param float beta: 默认为 0.45,聚类平均 cluster 大小
:param float timeout: 默认为 1800 秒(30 分钟),等待文本聚类任务完成的秒数。
:returns: 接口返回的结果列表。
:raises:
:py:exc:`~bosonnlp.HTTPError` - 如果 API 请求发生错误
:py:exc:`~bosonnlp.TaskError` - 任务出错。
:py:exc:`~bosonnlp.TimeoutError` - 如果文本聚类任务未能在 `timeout` 时间内完成。
调用示例:
>>> import os
>>> nlp = BosonNLP(os.environ['BOSON_API_TOKEN'])
>>> nlp.cluster(['今天天气好', '今天天气好', '今天天气不错', '点点楼头细雨',
... '重重江外平湖', '当年戏马会东徐', '今日凄凉南浦'])
[{'_id': 0, 'list': [0, 1], 'num': 2}]
"""
if not contents:
return []
if isinstance(contents[0], string_types):
contents = [{"_id": _id, "text": s} for _id, s in enumerate(contents)]
cluster = None
try:
cluster = self.create_cluster_task(contents, task_id)
cluster.analysis(alpha=alpha, beta=beta)
cluster.wait_until_complete(timeout)
result = cluster.result()
return result
finally:
if cluster is not None:
cluster.clear()
[docs] def create_cluster_task(self, contents=None, task_id=None):
"""创建 :py:class:`~bosonnlp.ClusterTask` 对象。
:param contents: 需要做典型意见的文本序列或者 (_id, text) 序列或者
{'_id': _id, 'text': text} 序列。如果没有指定 _id,则自动
生成唯一的 _id。默认为 :py:class:`None`。
:type contents: :py:class:`None` or sequence of string or
sequence of (_id, text) or sequence of {'_id': _id, 'text': text}
:param string task_id: 默认为 :py:class:`None`,表示自动生成一个
唯一的 task_id,典型意见任务的名字,可由字母和数字组成。
:raises:
:py:exc:`~bosonnlp.HTTPError` - 如果 API 请求发生错误
如果 `contents` 不为 :py:class:`None`,则可能会调用
:py:meth:`~bosonnlp.ClusterTask.push` 上传文本。
:returns: :py:class:`~bosonnlp.ClusterTask` 实例。
"""
return ClusterTask(self, contents, task_id)
def _comments_push(self, task_id, contents):
api_endpoint = '/comments/push/' + task_id
contents = CommentsTask._prepare_contents(contents)
if not contents:
return False
for i in range(0, len(contents), 100):
chunk = contents[i:i + 100]
r = self._api_request('POST', api_endpoint, data=chunk)
logger.info('Pushed %d of %d documents for comment clustering.' % (i+len(chunk), len(contents)))
return r.ok
def _comments_analysis(self, task_id, alpha=None, beta=None):
api_endpoint = '/comments/analysis/' + task_id
params = {}
if alpha is not None:
params['alpha'] = alpha
if beta is not None:
params['beta'] = beta
r = self._api_request('GET', api_endpoint, params=params)
logger.info('Comments analysis started.')
return r.ok
def _comments_status(self, task_id):
api_endpoint = '/comments/status/' + task_id
r = self._api_request('GET', api_endpoint)
status = str(r.json()['status']).lower()
if status == 'not found':
raise TaskNotFoundError('comments {} not found'.format(task_id), response=r)
if status == 'error':
raise TaskError('comments {} error'.format(task_id), response=r)
logger.info('Status: %s.' % status)
return status
def _comments_result(self, task_id):
api_endpoint = '/comments/result/' + task_id
v = self._api_request('GET', api_endpoint).json()
logger.info('%d comments fetched.' % len(v))
return v
def _comments_clear(self, task_id):
api_endpoint = '/comments/clear/' + task_id
r = self._api_request('GET', api_endpoint)
return r.ok
class _ClusterTask(object):
def __init__(self, nlp, contents=None, task_id=None):
if task_id is None:
task_id = _generate_id()
self.task_id = task_id
self._contents = []
@staticmethod
def _prepare_contents(contents):
if not contents:
return []
if isinstance(contents[0], string_types):
contents = [{"_id": _generate_id(), "text": s} for s in contents]
elif isinstance(contents[0], tuple):
contents = [{"_id": _id, "text": s} for _id, s in contents]
return contents
def push(self, contents):
"""批量上传需要进行处理的文本。
:param contents: 需要处理的的文本序列或者 (_id, text) 序列或者
{'_id': _id, 'text': text} 序列。如果没有指定 _id,则自动
生成唯一的 _id。
:type contents: sequence of string or sequence of (_id, text) or
sequence of {'_id': _id, 'text': text}
:raises: :py:exc:`~bosonnlp.HTTPError` - 如果 API 请求发生错误
"""
contents = self._prepare_contents(contents)
if self._push(contents):
self._contents.extend(contents)
def analysis(self, alpha=None, beta=None):
"""启动分析任务
:param float alpha: 默认为 0.8,最大 cluster 大小
:param float beta: 默认为 0.45,平均 cluster 大小
:raises: :py:exc:`~bosonnlp.HTTPError` - 如果 API 请求发生错误
"""
return self._analysis(alpha=alpha, beta=beta)
def wait_until_complete(self, timeout=None):
"""等待任务完成。
:param float timeout: 等待任务完成的秒数,默认为 :py:class:`None`,
表示不会超时。
:raises:
:py:exc:`~bosonnlp.HTTPError` - 如果 API 请求发生错误
:py:exc:`~bosonnlp.TaskNotFoundError` - 任务不存在。
:py:exc:`~bosonnlp.TaskError` - 任务出错。
:py:exc:`~bosonnlp.TimeoutError` - 如果任务未能在 `timeout` 时间内完成。
"""
elapsed = 0.0
seconds_to_sleep = 1.0
if timeout is not None:
seconds_to_sleep = min(seconds_to_sleep, timeout)
i = 0
while True:
time.sleep(seconds_to_sleep)
status = self.status()
if status == "done":
return
elapsed += seconds_to_sleep
if timeout and elapsed >= timeout:
raise TimeoutError('{0!r} timed out'.format(self))
i = i + 1
if i % 3 == 0 and seconds_to_sleep < 64:
seconds_to_sleep = seconds_to_sleep + seconds_to_sleep
def status(self):
"""获取任务状态。
:returns: 任务状态。
+---------------+------------------------------+
|状态 |说明 |
+---------------+------------------------------+
| ``received`` |成功接收到分析请求 |
+---------------+------------------------------+
| ``running`` |数据分析正在进行中 |
+---------------+------------------------------+
| ``done`` |分析已完成 |
+---------------+------------------------------+
:raises:
:py:exc:`~bosonnlp.HTTPError` - 如果 API 请求发生错误
:py:exc:`~bosonnlp.TaskNotFoundError` - 任务不存在。
:py:exc:`~bosonnlp.TaskError` - 任务出错。
"""
return self._status()
def result(self):
"""返回任务的结果。
:returns: 接口返回的结果。
:raises: :py:exc:`~bosonnlp.HTTPError` - 如果 API 请求发生错误
"""
return self._result()
def clear(self):
"""清空服务器端缓存的文本和结果。
:returns: 是否成功。
:raises: :py:exc:`~bosonnlp.HTTPError` - 如果 API 请求发生错误
"""
return self._clear()
def __repr__(self):
return "<{0.__class__.__name__} {0.task_id}>".format(self)
[docs]class ClusterTask(_ClusterTask):
"""文本聚类任务封装类。
一般通过 :py:meth:`~bosonnlp.BosonNLP.create_cluster_task` 创建实例。
>>> import os
>>> nlp = BosonNLP(os.environ['BOSON_API_TOKEN'])
>>> cluster = nlp.create_cluster_task()
之后可以多次调用 :py:meth:`~bosonnlp.ClusterTask.push` 上传文本。
>>> cluster.push(['今天天气好', '今天天气好', '今天天气不错', '点点楼头细雨'])
>>> cluster.push(['重重江外平湖'])
>>> cluster.push(['当年戏马会东徐', '今日凄凉南浦'])
然后调用 :py:meth:`~bosonnlp.ClusterTask.analysis` 启动文本聚类任务。
>>> cluster.analysis()
一般文本聚类任务都用于处理大量的文本,需要比较长的处理时间。可以调用
:py:meth:`~bosonnlp.ClusterTask.wait_until_complete` 等待任务完成。
>>> cluster.wait_until_complete(2 * 60) # 最多等待 2 分钟
也可以调用 :py:meth:`~bosonnlp.ClusterTask.status` 查看任务状态。
>>> cluster.status()
'done'
如果任务成功完成,可以通过 :py:meth:`~bosonnlp.ClusterTask.result` 获取结果。
>>> cluster.result()
[{'_id': '9e90c56e-f1bb-4605-b995-304af733207a',
'list': ['9e90c56e-f1bb-4605-b995-304af733207a',
'a3feff6b-6d1b-4f46-a2d8-0eea25c7f17f'],
'num': 2}]
获取结果后,可以通过 :py:meth:`~bosonnlp.ClusterTask.clear` 清空服务器端缓存的文本和结果。
>>> cluster.clear()
True
:param nlp: :py:class:`~bosonnlp.BosonNLP` 类实例。
其他参数和 :py:meth:`~bosonnlp.BosonNLP.cluster` 一致。
"""
def __init__(self, nlp, contents=None, task_id=None):
super(ClusterTask, self).__init__(nlp, contents, task_id)
self._push = partial(nlp._cluster_push, self.task_id)
self._analysis = partial(nlp._cluster_analysis, self.task_id)
self._status = partial(nlp._cluster_status, self.task_id)
self._result = partial(nlp._cluster_result, self.task_id)
self._clear = partial(nlp._cluster_clear, self.task_id)
self.push(contents)
if __name__ == '__main__':
import doctest
doctest.testmod()