Commit 2721f15e authored by Felipe Andrade's avatar Felipe Andrade

Initial commit

parents
Pipeline #518 failed with stages
### Python template
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class
# C extensions
*.so
# Distribution / packaging
.Python
env/
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
*.egg-info/
.installed.cfg
*.egg
# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec
# Installer logs
pip-log.txt
pip-delete-this-directory.txt
# Unit test / coverage reports
htmlcov/
.tox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
.hypothesis/
# Translations
*.mo
*.pot
# Sphinx documentation
docs/_build/
# Jetbrain's IDEA
.idea/
# PyBuilder
target/
# Jupyter Notebook
.ipynb_checkpoints
# pyenv
.python-version
# celery beat schedule file
celerybeat-schedule
# SageMath parsed files
*.sage.py
# dotenv
.env
.env-target
# virtualenv
.venv
venv/
ENV/
# Spyder project settings
.spyderproject
.spyproject
# Rope project settings
.ropeproject
# mkdocs documentation
/site
# mypy
.mypy_cache/
config-stitch.json
{
"python.pythonPath": "/home/felipe/Documents/testes/tap-elasticsearch/env/bin/python3.6"
}
\ No newline at end of file
This diff is collapsed.
.DEFAULT_GOAL := test
install:
python3 -m pip install -e '.[dev]'
# tap-elasticsearch
This is a [Singer](https://singer.io) tap that produces JSON-formatted data following the [Singer spec](https://github.com/singer-io/getting-started/blob/master/SPEC.md).
This tap:
- Pulls raw data from a Elasticsearch Server
- Outputs the schema for each resource
- Incrementally pulls data based on the input state
---
Copyright © 2018 Armando Miani
from setuptools import setup, find_packages
setup(
name="tap-elasticsearch",
version="0.0.2",
description="Singer.io tap for extracting data from Elasticsearch",
author="Armando Miani",
author_email="armando.miani@gmail.com",
url="http://singer.io",
classifiers=["Programming Language :: Python :: 3 :: Only"],
py_modules=["tap_elasticsearch"],
install_requires=[
'singer-python',
'requests',
'strconv',
'elasticsearch'
],
entry_points="""
[console_scripts]
tap-elasticsearch=tap_elasticsearch:main
""",
packages=find_packages(),
include_package_data=True
)
{
"client_id" : 103525,
"token" : "9b662c4cfabf49d905e4f2a68e9d30109c156e98213f42ef8ce8eec138a1a623"
}
This diff is collapsed.
from datetime import datetime, timezone
import singer
from .args import parse_args
from .es_util import (
get_schemas,
search_rows,
scroll
)
from .bypass import names
logger = singer.get_logger()
now = datetime.now(timezone.utc).isoformat()
default_page_size = 1000
schemas = None
def main():
params = parse_args()
schemas = get_schemas(params)
for s in schemas:
columns = schemas[s]['columns']
logger.info('Schema: {}'.format(s))
logger.info('{} Columns:'.format(len(columns)))
logger.info('-' * 30)
__export_schema(s, columns)
__export_data(params, s, columns)
def __export_schema(s, columns):
logger.info('Exporting Schema...')
properties = {'id': {'type': 'string', 'key': True}}
for c in columns:
properties[c['column']] = {
'type': c['type']
}
singer.write_schema(s, {'properties': properties}, key_properties="id")
def __export_data(params, s, columns):
start_from = 0
logger.info('Fetching first page...')
logger.info('Starting at: {}'.format(start_from))
result, scroll_id = search_rows(
params, doc_type=s, start_from=start_from, size=default_page_size)
total_rows_found = int(result['hits']['total'])
page_data = result['hits']['hits']
page_rows = len(page_data)
__export_page(s, page_data)
logger.info('Total rows found: {}'.format(total_rows_found))
logger.info('Rows in this page: {}'.format(page_rows))
while page_rows == default_page_size:
start_from += page_rows
result = scroll(params, scroll_id)
page_data = result['hits']['hits']
page_rows = len(page_data)
__export_page(s, page_data)
percent_completed = (start_from / total_rows_found) * 100
logger.info('{}% completed - {} of {}'.format(
percent_completed, start_from, total_rows_found))
def __export_page(s, page_rows):
rows = []
[rows.append(__combine_row(r)) for r in page_rows]
singer.write_records(s, rows)
def __combine_row(r):
row = r['_source']
row['id'] = r['_id']
for bc in names['force_types']:
if bc not in row:
continue
target_type = type(row[bc])
if target_type is dict:
row[bc] = [row[bc]]
return row
import sys
import getopt
def parse_args():
opts, args = getopt.getopt(sys.argv[1:], 'h:i:')
params = {}
for k, v in opts:
params[k] = v
return params
names = {
'force_types': {
'contrato_fiador': 'array',
'contrato_proprietarios': 'array',
'imovel_proprietarios': 'array',
'contrato_inquilino': 'array',
'contrato_cobrancas': 'array'
}
}
import singer
from elasticsearch.client import Elasticsearch
from elasticsearch.exceptions import NotFoundError
logger = singer.get_logger()
conn = {}
scroll_time = '1d'
def get_connection(es_params):
if 'es' not in conn:
conn['es'] = Elasticsearch([es_params['-h']], verify_certs=True)
return conn['es']
def get_schemas(es_params):
mappings = get_mappings(es_params)
if not mappings:
raise Exception('Index not Found.')
result = {}
for m in mappings[es_params['-i']]['mappings']:
columns = get_columns(
mappings[es_params['-i']]['mappings'][m]['properties'])
result[m] = {
'columns': columns
}
return result
def get_columns(columns):
from .bypass import names
result = []
for cn in columns:
if 'type' not in columns[cn]:
subcolumns = get_columns(columns[cn]['properties'])
special_datatype = 'object'
if cn in names['force_types']:
special_datatype = 'array'
result.append({
'column': cn,
'type': special_datatype,
'columns': subcolumns
})
else:
result.append({
'column': cn,
'type': __convert_datatype(columns[cn]['type'])
})
return result
def get_mappings(es_params):
es = get_connection(es_params)
try:
return es.indices.get(es_params['-i'])
except NotFoundError as e:
logger.warn('Index {} not found.'.format(es_params['-i']))
return None
def search_rows(es_params, doc_type, start_from, size):
es = get_connection(es_params)
result = es.search(
index=es_params['-i'],
doc_type=doc_type,
from_=start_from,
size=size,
scroll='1d')
return result, result['_scroll_id']
def scroll(es_params, scroll_id):
es = get_connection(es_params)
return es.scroll(scroll_id, scroll='1d')
def __convert_datatype(datatype):
if datatype == 'text':
return 'string'
return datatype
import strconv
def handle_value(value, schema_type):
v, t = strconv.convert(value, include_type=True)
if t == 'date':
return v.strftime('%Y-%m-%d 00:00:00')
if value == '' and 'null' in schema_type:
return None
if 'string' in schema_type:
return str(v)
return v
def compare_dicts(dict1, dict2):
return [k1 for k1 in dict1 if k1 not in dict2]
from elasticsearch.client import Elasticsearch
from elasticsearch.exceptions import NotFoundError
from pprint import pprint
conn = {}
def __convert_datatype(datatype):
if datatype == 'text':
return 'string'
return datatype
def get_columns(columns):
names = {
'force_types': {
'contrato_fiador': 'array',
'contrato_proprietarios': 'array',
'imovel_proprietarios': 'array',
'contrato_inquilino': 'array',
'contrato_cobrancas': 'array'
}
}
result = []
for cn in columns:
if 'type' not in columns[cn]:
subcolumns = get_columns(columns[cn]['properties'])
special_datatype = 'object'
if cn in names['force_types']:
special_datatype = 'array'
result.append({
'column': cn,
'type': special_datatype,
'columns': subcolumns
})
else:
result.append({
'column': cn,
'type': __convert_datatype(columns[cn]['type'])
})
return result
if 'es' not in conn:
conn['es'] = Elasticsearch(['https://search-imobiliarias-e4447lzy47ngcfxgcurvnq4uwe.us-east-1.es.amazonaws.com'], verify_certs=True)
mappings = conn['es'].indices.get('bigdata')
schema = {}
for m in mappings['bigdata']['mappings']:
columns = get_columns(
mappings['bigdata']['mappings'][m]['properties'])
schema[m] = {
'columns': columns
}
print(schema.keys())
#start_from = 0
start_from = 0
default_page_size = 1
result = conn['es'].search(
index='bigdata',
doc_type='contratos',
from_=start_from,
size=default_page_size,
scroll='1d')
scroll_id = 'DnF1ZXJ5VGhlbkZldGNoBQAAAAAAAhmAFlk3N3QwaGQ3Uk0yNjFScjdiWHp1Q2cAAAAAAAIZgRZZNzd0MGhkN1JNMjYxUnI3Ylh6dUNnAAAAAAACGYIWWTc3dDBoZDdSTTI2MVJyN2JYenVDZwAAAAAAAhmDFlk3N3QwaGQ3Uk0yNjFScjdiWHp1Q2cAAAAAAAIZhBZZNzd0MGhkN1JNMjYxUnI3Ylh6dUNn'
test = conn['es'].scroll(scroll_id, scroll='1d')
print(test)
pprint(result)
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment